IBM Streams ElasticSearch集成
streamsx.elasticsearch的Python项目详细描述
概述
提供在ElasticSearch索引中将元组数据存储为JSON文档的函数。
此包将com.ibm.streamsx.elasticsearch工具包公开为python方法,用于上的流分析服务 ibm cloud和ibm streams,包括ibmcloud-pak for data。
样品
streams应用程序向 索引:
from streamsx.topology.topology import * from streamsx.topology.schema import CommonSchema from streamsx.topology.context import submit import streamsx.elasticsearch as es topo = Topology('ElasticsearchHelloWorld') s = topo.source(['Hello', 'World!']).as_string() es.bulk_insert(s, 'test-index-cloud') submit('STREAMING_ANALYTICS_SERVICE', topo)
streams应用程序向索引写入json消息的简单示例,具有动态索引名(流的一部分):
from streamsx.topology.topology import * from streamsx.topology.schema import CommonSchema, StreamSchema from streamsx.topology.context import submit import streamsx.elasticsearch as es schema = StreamSchema('tuple<rstring indexName, rstring document>') topo = Topology() s = topo.source([('idx1','{"msg":"This is message number 1"}'), ('idx2','{"msg":"This is message number 2"}')]) s = s.map(lambda x : x, schema=schema) es.bulk_insert_dynamic(s, index_name_attribute='indexName', message_attribute='document') submit('STREAMING_ANALYTICS_SERVICE', topo)