amazonkinisis客户端库multilangdaemon的python接口
kclp的Python项目详细描述
#kclpy
这是[amazon kinisis client library for python]的一个分支(https://github.com/awslabs/amazon kinisis client python),
旨在使用[Amazon的Kinisis客户端库(KCL)(http://docs.aws.amazon.com/kinisis/latest/dev/developing-consumers-with-kcl.html)多语言后台程序接口简化对Kinisis流的使用。
这个库为kcl提供了一个python api。
有关RecordProcessor接口的详细信息,请参见http://docs.aws.amazon.com/kinisis/latest/dev/kinisis-record-processor-implementation-app-py.html。
```python
import kclpy
import json
Class mystreamprocessor(kclpy.recordprocessor):
def process_record(self,数据,分区键,序列号:
尝试:
假设传入的运动记录是json
data=json.loads(data)
user=data.get("user")
#使用检查点策略
返回true
我们无能为力")
return
kclpy.start(mystreamprocessor())
附带的[sylvite]库是一个可执行的jar,它将启动我们的记录处理器并为它提供记录。
请参阅[sylvite]库(https://github.com/empiricalresults/sylvite)了解详细信息和预建的jar。
``sh
>;Java-jar Salvi.jar——CONFIG= MyApp.属性'BR/'`BR/> BR/>日志> BR/> BR/>此库使用标准Python日志记录模块(命名空间'KCLPY’下的所有日志)。KCL多守护程序库期望stdout上有格式良好的数据,因此请确保将日志配置为使用stderr或文件。不要在处理器中使用打印语句!
BR/>背景
BR/>理解使用KCL的多LAN守护进程时的关键概念是有一个Java进程与KyExcel API进行所有通信,以及从STDIN/STDUT读取和写入的语言不可知子进程。这与hadoop流的工作原理非常相似。为了消耗这个流,我们需要启动一个Java进程,它将启动一个子进程,它将实际处理处理流数据。
BR/>虽然这听起来很复杂,但是在KCL上构建使我们能够实现所有检查点、重新定位和监视的优点。烘焙到KCL中的工作。kcl也由awslabs团队维护,因此任何未来的增强功能都将免费处理。
\recordprocessor
kclpy基于awslabs的示例代码,只需对日志和检查点进行一些小的调整。
\api
请参阅[亚马逊文档](http://docs.aws.amazon.com/kinisis/latest/dev/kinisis record processor implementation app py.html)。这个fork与原始实现保持兼容。
kclpy允许您自定义检查点行为。以下Kwargs可以传递给kclpy.recordprocessor:
*检查点频率秒-以固定间隔(秒)检查点。
*记录每个检查点-以固定数量处理的记录检查点。
``python
def main():
每60秒_分钟处理器=mystreamprocessor(检查点频率秒数=60)
您可以通过在*process_record*调用中返回true来强制执行explict检查点。但是要注意的是,这样做每一条记录都会导致对dynamodb表的大量写入。
``python
import kclpy
def process_data(data):
告诉库检查点,这样我们就不会再次处理它了。
返回true
class mystreamprocessor(kclpy.recordprocessor):
def process_record(self,data,partition_key,序列号:
shall_checkpoint=process_data(data)
return shall_checkpoint
def main():
=mystreamprocessor(
检查点频率秒数=0,
每个检查点的记录数=0
)
todo:启动处理器
这是[amazon kinisis client library for python]的一个分支(https://github.com/awslabs/amazon kinisis client python),
旨在使用[Amazon的Kinisis客户端库(KCL)(http://docs.aws.amazon.com/kinisis/latest/dev/developing-consumers-with-kcl.html)多语言后台程序接口简化对Kinisis流的使用。
这个库为kcl提供了一个python api。
有关RecordProcessor接口的详细信息,请参见http://docs.aws.amazon.com/kinisis/latest/dev/kinisis-record-processor-implementation-app-py.html。
```python
import kclpy
import json
Class mystreamprocessor(kclpy.recordprocessor):
def process_record(self,数据,分区键,序列号:
尝试:
假设传入的运动记录是json
data=json.loads(data)
user=data.get("user")
#使用检查点策略
返回true
我们无能为力")
return
附带的[sylvite]库是一个可执行的jar,它将启动我们的记录处理器并为它提供记录。
请参阅[sylvite]库(https://github.com/empiricalresults/sylvite)了解详细信息和预建的jar。
``sh
>;Java-jar Salvi.jar——CONFIG= MyApp.属性'BR/'`BR/> BR/>日志> BR/> BR/>此库使用标准Python日志记录模块(命名空间'KCLPY’下的所有日志)。KCL多守护程序库期望stdout上有格式良好的数据,因此请确保将日志配置为使用stderr或文件。不要在处理器中使用打印语句!
BR/>背景
BR/>理解使用KCL的多LAN守护进程时的关键概念是有一个Java进程与KyExcel API进行所有通信,以及从STDIN/STDUT读取和写入的语言不可知子进程。这与hadoop流的工作原理非常相似。为了消耗这个流,我们需要启动一个Java进程,它将启动一个子进程,它将实际处理处理流数据。
BR/>虽然这听起来很复杂,但是在KCL上构建使我们能够实现所有检查点、重新定位和监视的优点。烘焙到KCL中的工作。kcl也由awslabs团队维护,因此任何未来的增强功能都将免费处理。
\recordprocessor
kclpy基于awslabs的示例代码,只需对日志和检查点进行一些小的调整。
\api
请参阅[亚马逊文档](http://docs.aws.amazon.com/kinisis/latest/dev/kinisis record processor implementation app py.html)。这个fork与原始实现保持兼容。
kclpy允许您自定义检查点行为。以下Kwargs可以传递给kclpy.recordprocessor:
*检查点频率秒-以固定间隔(秒)检查点。
*记录每个检查点-以固定数量处理的记录检查点。
``python
def main():
每60秒_分钟处理器=mystreamprocessor(检查点频率秒数=60)
您可以通过在*process_record*调用中返回true来强制执行explict检查点。但是要注意的是,这样做每一条记录都会导致对dynamodb表的大量写入。
``python
import kclpy
def process_data(data):
告诉库检查点,这样我们就不会再次处理它了。
返回true
class mystreamprocessor(kclpy.recordprocessor):
def process_record(self,data,partition_key,序列号:
shall_checkpoint=process_data(data)
return shall_checkpoint
def main():
=mystreamprocessor(
检查点频率秒数=0,
每个检查点的记录数=0
)
todo:启动处理器