ksql rest api的python包装器

ksql的Python项目详细描述


ksql rest api的python包装器。使用此库可以轻松地与ksql rest api交互。

支持的ksql版本:5.x

https://travis-ci.org/bryanyang0528/ksql-python.svg?branch=masterhttps://codecov.io/gh/bryanyang0528/ksql-python/branch/master/graph/badge.svg

安装

pip install ksql

git clone https://github.com/bryanyang0528/ksql-python
cd ksql-python
python setup.py install

开始

设置ksql

这是ksql的github页面。https://github.com/confluentinc/ksql

如果您已经安装了开源confluent cli(例如,通过安装confluent开源或企业平台),则可以使用一个命令启动ksql及其依赖项:

confluent start ksql-server

ksql python api的设置

  • ksql api的设置:
fromksqlimportKSQLAPIclient=KSQLAPI('http://ksql-server:8088')
  • 启用日志记录的ksql api设置:
importloggingfromksqlimportKSQLAPIlogging.basicConfig(level=logging.DEBUG)client=KSQLAPI('http://ksql-server:8088')

选项

OptionTypeRequiredDescription
^{tt1}$stringyesYour ksql-server url. Example: ^{tt2}$
^{tt3}$integernoTimout for Requests. Default: ^{tt4}$

主要方法

ksql

此方法可用于某些ksql功能,而其他特定方法(如querycreate_streamcreate_stream_as)不支持这些功能。 下面的示例演示如何执行show tables语句:

client.ksql('show tables')
  • 示例响应[{'tables': {'statementText': 'show tables;', 'tables': []}}]

查询

它将执行sql查询并继续监听流数据。

client.query('select * from table1')

此命令返回生成器。它可以打印,例如通过next(query)或for循环读取其值。下面是一个完整的示例:

fromksqlimportKSQLAPIclient=KSQLAPI('http://localhost:8088')query=client.query('select * from table1')foriteminquery:print(item)
  • 示例响应

    {"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
    

简化的api

创建流/创建表

client.create_stream(table_name=table_name,columns_type=columns_type,topic=topic,value_format=value_format)

选项

OptionTypeRequiredDescription
^{tt10}$stringyesname of stream/table
^{tt11}$listyesex:^{tt12}$
^{tt13}$stringyesKafka topic
^{tt14}$stringno^{tt15}$ (Default) or ^{tt16}$ or ^{tt17}$
^{tt18}$stringfor TableKey (used for JOINs)
  • 回答
If create table/stream succeed:
return True
If failed:raise a CreateError(respose_from_ksql_server)

将流创建为

用于将流创建为select的简化api

client.create_stream_as(table_name=table_name,select_columns=select_columns,src_table=src_table,kafka_topic=kafka_topic,value_format=value_format,conditions=conditions,partition_by=partition_by,**kwargs)
CREATESTREAM<table_name>[WITH(kafka_topic=<kafka_topic>,value_format=<value_format>,property_name=expression...)]ASSELECT<select_columns>FROM<src_table>[WHERE<conditions>]PARTITIONBY<partition_by>];

选项

OptionTypeRequiredDescription
^{tt10}$stringyesname of stream/table
^{tt20}$listyesyou can select ^{tt21}$ or ^{tt22}$
^{tt23}$stringyesname of source table
^{tt24}$stringnoThe name of the Kafka topic of this new stream(table).
^{tt14}$stringno^{tt16}$, ^{tt27}$
^{tt28}$stringnoThe conditions in the where clause.
^{tt29}$stringnoData will be distributed across partitions by this column.
^{tt30}$pairnoplease provide ^{tt31}$ pairs. Please see more options.

ksql连接

还不支持通过显式方法在流和表之间进行ksql连接,但是可以使用ksql方法进行如下操作:

client.ksql("CREATE STREAM join_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='join_per_user') AS SELECT Time, Amount FROM source c INNER JOIN users u on c.user = u.userid WHERE u.USERID = 1")

文件上传

上传

从.ksql文件运行命令。只能支持ksql命令,而不支持流式查询。

fromksqlimportFileUploadpointer=FileUpload('http://ksql-server:8080')pointer.upload('rules.ksql')

选项

OptionTypeRequiredDescription
^{tt33}$stringyesname of file containing the rules
  • 回答
If ksql-commands succesfully executed:
return (List of server response for all commands)
If failed:raise the appropriate error

更多选项

官方文件中有更多的属性(分区、副本等)。

KSQL Syntax Reference

  • 回答
If create table/stream succeed:
return True
If failed:raise a CreatError(respose_from_ksql_server)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
找不到足够的连续内存会导致OOM吗?   java如何计算一个矩形可以放入另一个矩形的次数?   谷歌地图api java   java Autowired批注在AuthenticationSuccessHandler中返回null   Java电话号码格式正则表达式   eclipse我希望能够同时选择多个复选框?Java SWT   java j2objc可以用于生成不适用于iOS的目标C代码吗?   使用cUrl将PHP post数组转换为java servlet   java playpac4j和Play 2.5:@requireAuthentication注释导致stacktrace   java为什么在Javamail中连接超时?   java使用SwingUtilities。main方法中的invokeLater()   如何在名为from Unity的Java插件中创建Android处理程序