ksql rest api的python包装器
ksql的Python项目详细描述
ksql rest api的python包装器。使用此库可以轻松地与ksql rest api交互。
支持的ksql版本:5.x
安装
pip install ksql
或
git clone https://github.com/bryanyang0528/ksql-python
cd ksql-python
python setup.py install
开始
设置ksql
这是ksql的github页面。https://github.com/confluentinc/ksql
如果您已经安装了开源confluent cli(例如,通过安装confluent开源或企业平台),则可以使用一个命令启动ksql及其依赖项:
confluent start ksql-server
ksql python api的设置
- ksql api的设置:
fromksqlimportKSQLAPIclient=KSQLAPI('http://ksql-server:8088')
- 启用日志记录的ksql api设置:
importloggingfromksqlimportKSQLAPIlogging.basicConfig(level=logging.DEBUG)client=KSQLAPI('http://ksql-server:8088')
选项
Option | Type | Required | Description |
---|---|---|---|
^{tt1}$ | string | yes | Your ksql-server url. Example: ^{tt2}$ |
^{tt3}$ | integer | no | Timout for Requests. Default: ^{tt4}$ |
主要方法
ksql
此方法可用于某些ksql功能,而其他特定方法(如query、create_stream或create_stream_as)不支持这些功能。 下面的示例演示如何执行show tables语句:
client.ksql('show tables')
- 示例响应[{'tables': {'statementText': 'show tables;', 'tables': []}}]
查询
它将执行sql查询并继续监听流数据。
client.query('select * from table1')
此命令返回生成器。它可以打印,例如通过next(query)或for循环读取其值。下面是一个完整的示例:
fromksqlimportKSQLAPIclient=KSQLAPI('http://localhost:8088')query=client.query('select * from table1')foriteminquery:print(item)
示例响应
{"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null} {"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null} {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null} {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
简化的api
创建流/创建表
client.create_stream(table_name=table_name,columns_type=columns_type,topic=topic,value_format=value_format)
选项
Option | Type | Required | Description |
---|---|---|---|
^{tt10}$ | string | yes | name of stream/table |
^{tt11}$ | list | yes | ex:^{tt12}$ |
^{tt13}$ | string | yes | Kafka topic |
^{tt14}$ | string | no | ^{tt15}$ (Default) or ^{tt16}$ or ^{tt17}$ |
^{tt18}$ | string | for Table | Key (used for JOINs) |
- 回答
If create table/stream succeed: | |
---|---|
return True | |
If failed: | raise a CreateError(respose_from_ksql_server) |
将流创建为
用于将流创建为select的简化api
client.create_stream_as(table_name=table_name,select_columns=select_columns,src_table=src_table,kafka_topic=kafka_topic,value_format=value_format,conditions=conditions,partition_by=partition_by,**kwargs)
CREATESTREAM<table_name>[WITH(kafka_topic=<kafka_topic>,value_format=<value_format>,property_name=expression...)]ASSELECT<select_columns>FROM<src_table>[WHERE<conditions>]PARTITIONBY<partition_by>];
选项
Option | Type | Required | Description |
---|---|---|---|
^{tt10}$ | string | yes | name of stream/table |
^{tt20}$ | list | yes | you can select ^{tt21}$ or ^{tt22}$ |
^{tt23}$ | string | yes | name of source table |
^{tt24}$ | string | no | The name of the Kafka topic of this new stream(table). |
^{tt14}$ | string | no | ^{tt16}$, ^{tt27}$ |
^{tt28}$ | string | no | The conditions in the where clause. |
^{tt29}$ | string | no | Data will be distributed across partitions by this column. |
^{tt30}$ | pair | no | please provide ^{tt31}$ pairs. Please see more options. |
ksql连接
还不支持通过显式方法在流和表之间进行ksql连接,但是可以使用ksql方法进行如下操作:
client.ksql("CREATE STREAM join_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='join_per_user') AS SELECT Time, Amount FROM source c INNER JOIN users u on c.user = u.userid WHERE u.USERID = 1")
文件上传
上传
从.ksql文件运行命令。只能支持ksql命令,而不支持流式查询。
fromksqlimportFileUploadpointer=FileUpload('http://ksql-server:8080')pointer.upload('rules.ksql')
选项
Option | Type | Required | Description |
---|---|---|---|
^{tt33}$ | string | yes | name of file containing the rules |
- 回答
If ksql-commands succesfully executed: | |
---|---|
return (List of server response for all commands) | |
If failed: | raise the appropriate error |
更多选项
官方文件中有更多的属性(分区、副本等)。
- 回答
If create table/stream succeed: | |
---|---|
return True | |
If failed: | raise a CreatError(respose_from_ksql_server) |