IBM Streams ElasticSearch集成

streamsx.elasticsearch的Python项目详细描述


概述

提供在ElasticSearch索引中将元组数据存储为JSON文档的函数。

此包将com.ibm.streamsx.elasticsearch工具包公开为python方法,用于上的流分析服务 ibm cloud和ibm streams,包括ibmcloud-pak for data。

样品

streams应用程序向 索引:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.elasticsearch as es

topo = Topology('ElasticsearchHelloWorld')

s = topo.source(['Hello', 'World!']).as_string()
es.bulk_insert(s, 'test-index-cloud')

submit('STREAMING_ANALYTICS_SERVICE', topo)

streams应用程序向索引写入json消息的简单示例,具有动态索引名(流的一部分):

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.elasticsearch as es

schema = StreamSchema('tuple<rstring indexName, rstring document>')
topo = Topology()
s = topo.source([('idx1','{"msg":"This is message number 1"}'), ('idx2','{"msg":"This is message number 2"}')])
s = s.map(lambda x : x, schema=schema)
es.bulk_insert_dynamic(s, index_name_attribute='indexName', message_attribute='document')

submit('STREAMING_ANALYTICS_SERVICE', topo)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
未绘制java OpenGL顶点   swing无法在Java框架中设置文本   java使用xmemcached客户端访问AWS ElastiCache   java将元素添加到默认arraylist   java从给定位置向后逐个字符地读取文件   java将为此处显示的代码创建多少个对象?   Java异步返回CompletableFuture   java在滚动视图中显示图像   我想为用Netbeans开发的Java项目创建安装程序,并使用Mysql Xampp   java是否可以将COUNT与不同的JPA投影一起使用?   java如何定制javafx。场景图表NumberAxis用于更改20个主要记号的硬编码上限   javajavax。xml。ws。WebServiceException:无法访问WSDL   Jboss中的java多线程   java类不是抽象类,并且不重写抽象方法actionPerformed(ActionEvent)   基于java Spring安全令牌的身份验证   java如何在Spring MVC的自定义验证器中从属性文件中读取参数值   使用OpenCV和java在Android中点击获取图像维度?   java无法解析为变量解析。通用域名格式