异步运动库

async-kinesis的Python项目详细描述


异步运动

Code style: blackPyPI versionPython 3.6Python 3.6

pip install async-kinesis

功能

  • 为生产者和消费者使用队列
    • 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
    • 使用者独立于碎片读取器在msg队列上迭代
  • 可配置以处理碎片限制,但如果需要,将限制/重试
    • IE多个独立客户端正在饱和碎片
  • 心跳检查点
    • 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片
  • 处理器(聚合器+序列化器)
    • json行分隔,msgpack

有关详细信息,请参见docs/design。 关于为什么要重新发明轮子,请参见docs/yetanother

环境变量

根据BOTO3的要求

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

生产者

from kinesis import Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
buffer_time0.5Buffer time in seconds before auto flushing records
put_rate_limit_per_shard1000"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes"
put_bandwidth_limit_per_shard1024Kb per sec. max is 1024 per shard (ie 1 MiB). Keep below to minimize ProvisionedThroughputExceeded" errors
batch_size500"Each PutRecords request can support up to 500 records"
max_queue_size10000put() method will block when queue is at max
after_flush_funNoneasync function to call after doing a flush (err put_records()) call
processorJsonProcessor()Record aggregator/serializer. Default is JSON without aggregation. Note this is highly inefficient as each record can be up to 1Mib

消费者

from kinesis import Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
max_queue_size10000the fetch() task shard will block when queue is at max
max_shard_consumersNoneMax number of shards to use. None = all
record_limit10000Number of records to fetch with get_records()
sleep_time_no_records2No of seconds to sleep when caught up
iterator_typeTRIM_HORIZONDefault shard iterator type for new/unknown shards (ie start from start of stream). Alternative is "LATEST" (ie end of stream)
shard_fetch_rate1No of fetches per second (max = 5). 1 is recommended as allows having multiple consumers without hitting the max limit.
checkpointerMemoryCheckPointer()Checkpointer to use
processorJsonProcessor()Record aggregator/serializer. Must Match processor used by Producer()

检查点

  • 内存(默认但有点无意义)
    MemoryCheckPointer()
  • redis
    RedisCheckPointer(name, session_timeout=60, heartbeat_frequency=15, is_cluster=False)

需要env:

    REDIS_HOST

需要pip install aredis

处理器(聚合器+序列化器)

聚合允许成批处理多个记录以更有效地使用流。 参考https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

ClassAggregatorSerializerDescription
StringProcessorSimpleAggregatorStringSerializerSingle String record
JsonProcessorSimpleAggregatorJsonSerializerSingle JSON record
JsonLineProcessorNewlineAggregatorJsonSerializerMultiple JSON record separated by new line char
MsgpackProcessorNetstringAggregatorMsgpackSerializerMultiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring)

注意,您可以很容易地定义自己的处理器,因为它只是一个继承aggregator+serializer的类。

class MsgpackProcessor(Processor, NetstringAggregator, MsgpackSerializer):
    pass

只需使用serialize()和deserialize()方法定义一个新的序列化程序类。

注意:

  • 如果安装了json,json将使用pip install ujson
  • msgpack要求安装pip install msgpack

基准/示例

有关代码,请参见benchmark.py

5万件约1K(Python)大小的物品,使用单个碎片。

Benchmark

单元测试

使用https://github.com/mhart/kinesalite进行本地测试。

通过Docker运行测试

docker-compose up --abort-on-container-exit --exit-code-from test

用于本地测试

docker-compose up kinesis redis

然后在您的虚拟机中

nosetests

# or run individual test
nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded

注意,有一些测试用例使用actualaws kinesists(awskineists) 这些操作需要设置env才能运行

使用创建“.env”文件

TESTING_USE_AWS_KINESIS=1

注意,如果提交PR,则可以忽略这些测试,除非更改了核心批处理/处理行为。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何用java表示这个数学函数的算法   Java/Stream帮助:仅使用streams将嵌套的映射列表转换为映射   使用Selenium连接到数据库时发生java未知主机异常   java如何了解jvm内存使用:“堆内存”和“堆外内存”   java Oracle BI报告导入模板   java如何使用Spring将xml转换为bean?   java线程。join()以保证执行顺序   java从THINGSPEAK到ANDROID应用程序获取JSON数据   使用Java的stanford库中的异常   java正确使用来自其他类文件的方法   如果集合中的元素类型为接口类型,如何填充集合?(爪哇)   记录java。util。记录器创建的文件超过了应有的数量   类Java对象uniq值   尝试调用无法应用于()的方法时出现java错误