基于低层次、多处理的aws动态生产者和消费者库

kinesis-python的Python项目详细描述


https://img.shields.io/travis/NerdWalletOSS/kinesis-python.svghttps://img.shields.io/codecov/c/github/NerdWalletOSS/kinesis-python.svgLatest PyPI version

official Kinesis python library需要使用amazon的“multilangdaemon”,这是一个java可执行文件 通过stdin/stdout上的管道消息进行操作。

ಠ_ಠ

从维护的角度来看,希望有一个客户机库的单一实现是有意义的 负责kpl的团队,要求安装jre,并且必须考虑 java和python使用的流对于在没有java的环境中工作的团队来说是不可取的。

这是一个纯粹的python实现,它是利用python的多处理的kinisis producer和consumer类 模块为每个碎片生成一个进程,然后通过队列将消息发送回主进程。这只取决于 关于boto3(aws sdk)、offspring(子流程实现)和six(py2/py3兼容性)。

它还包括一个dynamodb状态后端,允许多个碎片的多实例使用,并存储 检查点数据,以便您可以在重新启动或崩溃后恢复流中中断的位置。

概述

所有功能都包装在两个类中:KinesisConsumerKinesisProducer

消费者

使用者通过启动流中每个shard的进程,然后实现python迭代器协议来工作。

fromkinesis.consumerimportKinesisConsumerconsumer=KinesisConsumer(stream_name='my-stream')formessageinconsumer:print"Received message: {0}".format(message)

从每个shard进程接收的消息通过python队列传递回主进程 为处理而产生。信息没有严格的顺序,但这是动觉的属性,而不是这个 实施。

锁定、检查点和多实例消耗

当部署具有多个实例的应用程序时,可以利用dynamodb来协调哪个实例 负责哪个碎片,因为不希望每个实例处理所有记录。

无论是否有多个节点,在处理记录时都需要检查流,以便 如果重新启动消费者,则从停止的位置取货。

利用dynamodb的“state”后端允许用户协调哪个节点负责哪个碎片和 在我们目前正在读的数据流中。

fromkinesis.consumerimportKinesisConsumerfromkinesis.stateimportDynamoDBconsumer=KinesisConsumer(stream_name='my-stream',state=DynamoDB(table_name='my-kinesis-state'))formessageinconsumer:print"Received message: {0}".format(message)

TynDoDB表必须已经存在,并且必须具有^ {TT3}$ $ ^ {TT4}$,类型^ {TT5}$(String)。

生产者

制作人通过启动一个单一的流程来进行积累并发布到流中。

fromkinesis.producerimportKinesisProducerproducer=KinesisProducer(stream_name='my-stream')producer.put('Hello World from Python')

默认情况下,累积缓冲区时间为500ms,或最大记录大小为1MB,以先发生者为准。你可以 通过以秒为单位指定的buffer_timekwarg实例化生产者时更改缓冲区时间。为了 例如,如果您主要关心的是预算而不是性能,您可以在60秒的时间内累积。

producer=KinesisProducer(stream_name='my-stream',buffer_time=60)

后台进程采取预防措施,以确保在 关机时间通过信号处理程序和Python AT出口模块,但它不是完全持久的,如果你是 向生产者进程发送kill -9,任何累积的消息都将丢失。

AWS权限

默认情况下,producer、consumer&state类都使用默认的boto3 credentials chain。如果你想改变 您可以实例化自己的boto3.Session对象,并通过^{tt9}将其传递到构造函数中。$ KinesisProducerKinesisConsumerDynamoDB的关键字参数。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java将多个线程中的函数放入单个队列   数组在Java中,如何在不改变整数顺序的情况下找到整数组的顺序?   java控制器属于表示层?   java Apache Ivy和本地Maven repo如何处理使用Maven 3构建的快照   Java可与泛型类型进行比较   java这个表达式在泛型中是什么意思   JavaEclipse和TeamCity插件   java检测构造函数中的final是否为空   java如何在StanfordCoreNLP管道中同时使用词汇化和依赖性解析器?   java在AntUnit控制台日志中显示完整异常堆栈跟踪   lambda如何与Java 8供应商建立连锁关系   如何让GRPC的重试机制在Kubernetes集群中使用grpcjava工作?   如何使用openjdk:7 Docker映像和Gradle包装器避免“EC参数错误”?   java将集合映射扩展为一维映射新的“无法推断函数接口类型”