没有项目描述

pyconfluent的Python项目详细描述


合流

pyconfluent通过为ksql和schema registry restapi提供包装器,并根据robinhood的faust和winton的winton-kafka-streams对kafka streams java包进行深入的pythonic解释,为python带来了最合流的kafka功能。

安装

这个包是为Python3.6编写的,没有在其他版本上进行测试。

pip3 install pyconfluent

使用量

PyConfluent要求Confluent平台及其所有底层服务都在运行。创建类实例时,请确保传入正在运行的代理列表,或将其留空以连接到localhost

ksql

import KSQL

k = KSQL()  # enter your boostrap_server here if not 'localhost', no port

# create streams from existing topics
sales_stream = k.ksql("CREATE STREAM sales (company VARCHAR, product BIGINT, quantity BIGINT)"
                      "WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")

reviews_stream = k.ksql("CREATE STREAM reviews (company VARCHAR, product BIGINT, review VARCHAR)"
                        "WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")

# stream to stream join, WITHIN clause required
# creates and populates new kafka topic
enriched_stream = k.ksql("CREATE STREAM enriched AS SELECT"
                         "sales.company, sales.product, sales.quantity"
                         "FROM sales LEFT JOIN reviews WITH 1 HOURS"
                         "ON sales.company = reviews.company")

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
AmazonS3查找从S3forJava下载的压缩文件的MIME类型   java如何使用Selenium在<span>中查找具有特定文本的元素   python如何使用OpenIEDemo生成自定义三元组。由stanfordnlp提供的java   java遇到“方法不适用”编译错误   java如何使用Mockito在Looper中运行的验证代码。getMainLooper?   类Java目录错误   java在已知其他泛型信息时使用原始类型   java connect()和disconnect()在哪里实现?   java使用PDF Box库拆分PDF,生成的PDF几乎与源PDF文件大小相同   java PowerMockito返回错误的对象   java如何找到TIBCO集合消息的字节编码?   java Basic音乐播放器下一步和上一步按钮   添加模块描述符时,java没有名为“entityManagerFactory”的bean可用   java为什么我的代码不是线程安全的?   eclipse java。引用项目中的类的lang.NoClassDefFoundError