amazonkinisis客户端库multilangdaemon的python接口

kclp的Python项目详细描述


#kclpy

这是[amazon kinisis client library for python]的一个分支(https://github.com/awslabs/amazon kinisis client python),
旨在使用[Amazon的Kinisis客户端库(KCL)(http://docs.aws.amazon.com/kinisis/latest/dev/developing-consumers-with-kcl.html)多语言后台程序接口简化对Kinisis流的使用。



这个库为kcl提供了一个python api。

有关RecordProcessor接口的详细信息,请参见http://docs.aws.amazon.com/kinisis/latest/dev/kinisis-record-processor-implementation-app-py.html。


```python
import kclpy
import json


Class mystreamprocessor(kclpy.recordprocessor):

def process_record(self,数据,分区键,序列号:

尝试:
假设传入的运动记录是json
data=json.loads(data)
user=data.get("user")

#使用检查点策略
返回true

我们无能为力")
return


kclpy.start(mystreamprocessor())

附带的[sylvite]库是一个可执行的jar,它将启动我们的记录处理器并为它提供记录。

请参阅[sylvite]库(https://github.com/empiricalresults/sylvite)了解详细信息和预建的jar。

``sh
>;Java-jar Salvi.jar——CONFIG= MyApp.属性'BR/'`BR/> BR/>日志> BR/> BR/>此库使用标准Python日志记录模块(命名空间'KCLPY’下的所有日志)。KCL多守护程序库期望stdout上有格式良好的数据,因此请确保将日志配置为使用stderr或文件。不要在处理器中使用打印语句!
BR/>背景
BR/>理解使用KCL的多LAN守护进程时的关键概念是有一个Java进程与KyExcel API进行所有通信,以及从STDIN/STDUT读取和写入的语言不可知子进程。这与hadoop流的工作原理非常相似。为了消耗这个流,我们需要启动一个Java进程,它将启动一个子进程,它将实际处理处理流数据。
BR/>虽然这听起来很复杂,但是在KCL上构建使我们能够实现所有检查点、重新定位和监视的优点。烘焙到KCL中的工作。kcl也由awslabs团队维护,因此任何未来的增强功能都将免费处理。




\recordprocessor

kclpy基于awslabs的示例代码,只需对日志和检查点进行一些小的调整。


\api

请参阅[亚马逊文档](http://docs.aws.amazon.com/kinisis/latest/dev/kinisis record processor implementation app py.html)。这个fork与原始实现保持兼容。



kclpy允许您自定义检查点行为。以下Kwargs可以传递给kclpy.recordprocessor:

*检查点频率秒-以固定间隔(秒)检查点。
*记录每个检查点-以固定数量处理的记录检查点。

``python
def main():
每60秒_分钟处理器=mystreamprocessor(检查点频率秒数=60)

您可以通过在*process_record*调用中返回true来强制执行explict检查点。但是要注意的是,这样做每一条记录都会导致对dynamodb表的大量写入。

``python
import kclpy

def process_data(data):
告诉库检查点,这样我们就不会再次处理它了。
返回true


class mystreamprocessor(kclpy.recordprocessor):

def process_record(self,data,partition_key,序列号:
shall_checkpoint=process_data(data)
return shall_checkpoint

def main():
=mystreamprocessor(
检查点频率秒数=0,
每个检查点的记录数=0


todo:启动处理器

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java为什么这一行是charg=(char)(br.read());被跳过?   java三重DES中的IV在哪里?   java Lombok注释不在Intellij idea下编译   java为什么我不能使用过滤器作为流中的最后一步   sqlite Java编译错误找不到符号方法next()   在Java中解析处理对int来说太大的数字   java摄像头不工作   java是“断言错误”好的做法?   java确定api请求是来自CURL还是POSTMAN?   JavaSpringMVC导航   java使单例模式代码更强大   当Java(解释器)区分大小写时,为什么Java编译器(javac)不区分大小写?   java注释字符串[]oneArr();vs字符串[][]twoArr();公告   java命名查询的利弊   java使用log4jweb在web应用程序中配置Log4j2。罐子   java正则表达式查找以开头和结尾的所有可能出现的文本~   java从字符串执行方法