没有项目描述

pyconfluent的Python项目详细描述


合流

pyconfluent通过为ksql和schema registry restapi提供包装器,并根据robinhood的faust和winton的winton-kafka-streams对kafka streams java包进行深入的pythonic解释,为python带来了最合流的kafka功能。

安装

这个包是为Python3.6编写的,没有在其他版本上进行测试。

pip3 install pyconfluent

使用量

PyConfluent要求Confluent平台及其所有底层服务都在运行。创建类实例时,请确保传入正在运行的代理列表,或将其留空以连接到localhost

ksql

import KSQL

k = KSQL()  # enter your boostrap_server here if not 'localhost', no port

# create streams from existing topics
sales_stream = k.ksql("CREATE STREAM sales (company VARCHAR, product BIGINT, quantity BIGINT)"
                      "WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")

reviews_stream = k.ksql("CREATE STREAM reviews (company VARCHAR, product BIGINT, review VARCHAR)"
                        "WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")

# stream to stream join, WITHIN clause required
# creates and populates new kafka topic
enriched_stream = k.ksql("CREATE STREAM enriched AS SELECT"
                         "sales.company, sales.product, sales.quantity"
                         "FROM sales LEFT JOIN reviews WITH 1 HOURS"
                         "ON sales.company = reviews.company")

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何在java swing表单配置文件中显示动态布局   swing Java使用GridLayout和鼠标交互式JPanel创建JFrame   java使用jsp dao和servlet从数据库中的4行中只插入几行   java SqlLite:我们可以选择行作为列吗?   启动glassfish服务器时java获取错误   PersistenceUnit的java部署[…]失败。关闭此PersistenceUnit的所有工厂   java将具有多个关系的实体与集合中的任何元素进行匹配   java对命名模式的建议,该模式在op失败时尝试强制转换返回null   使用jtwitter的java安卓抛出错误   使用Java方法查找数组中最小int的位置   SwingJava。网络气球提示没有出现   java无法使用JavaMail api发送邮件   java HashSet contains()为自定义对象返回false。hashCode()和equals()似乎实现正确   Java:在运行时和编译时向类中添加字段?   java正在服务类中导入jersey和JAXR,这被认为是错误的做法