没有项目描述
pyconfluent的Python项目详细描述
合流
pyconfluent通过为ksql和schema registry restapi提供包装器,并根据robinhood的faust
和winton的winton-kafka-streams
对kafka streams java包进行深入的pythonic解释,为python带来了最合流的kafka功能。
安装
这个包是为Python3.6编写的,没有在其他版本上进行测试。
pip3 install pyconfluent
使用量
PyConfluent要求Confluent平台及其所有底层服务都在运行。创建类实例时,请确保传入正在运行的代理列表,或将其留空以连接到localhost
。
ksql
import KSQL
k = KSQL() # enter your boostrap_server here if not 'localhost', no port
# create streams from existing topics
sales_stream = k.ksql("CREATE STREAM sales (company VARCHAR, product BIGINT, quantity BIGINT)"
"WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")
reviews_stream = k.ksql("CREATE STREAM reviews (company VARCHAR, product BIGINT, review VARCHAR)"
"WITH (KAFKA_TOPIC='sales', VALUE_FORMAT='JSON';")
# stream to stream join, WITHIN clause required
# creates and populates new kafka topic
enriched_stream = k.ksql("CREATE STREAM enriched AS SELECT"
"sales.company, sales.product, sales.quantity"
"FROM sales LEFT JOIN reviews WITH 1 HOURS"
"ON sales.company = reviews.company")