异步运动库

async-kinesis的Python项目详细描述


异步运动

Code style: blackPyPI versionPython 3.6Python 3.6

pip install async-kinesis

功能

  • 为生产者和消费者使用队列
    • 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
    • 使用者独立于碎片读取器在msg队列上迭代
  • 可配置以处理碎片限制,但如果需要,将限制/重试
    • IE多个独立客户端正在饱和碎片
  • 心跳检查点
    • 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片
  • 处理器(聚合器+序列化器)
    • json行分隔,msgpack

有关详细信息,请参见docs/design。 关于为什么要重新发明轮子,请参见docs/yetanother

环境变量

根据BOTO3的要求

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

生产者

from kinesis import Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
buffer_time0.5Buffer time in seconds before auto flushing records
put_rate_limit_per_shard1000"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes"
put_bandwidth_limit_per_shard1024Kb per sec. max is 1024 per shard (ie 1 MiB). Keep below to minimize ProvisionedThroughputExceeded" errors
batch_size500"Each PutRecords request can support up to 500 records"
max_queue_size10000put() method will block when queue is at max
after_flush_funNoneasync function to call after doing a flush (err put_records()) call
processorJsonProcessor()Record aggregator/serializer. Default is JSON without aggregation. Note this is highly inefficient as each record can be up to 1Mib

消费者

from kinesis import Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
max_queue_size10000the fetch() task shard will block when queue is at max
max_shard_consumersNoneMax number of shards to use. None = all
record_limit10000Number of records to fetch with get_records()
sleep_time_no_records2No of seconds to sleep when caught up
iterator_typeTRIM_HORIZONDefault shard iterator type for new/unknown shards (ie start from start of stream). Alternative is "LATEST" (ie end of stream)
shard_fetch_rate1No of fetches per second (max = 5). 1 is recommended as allows having multiple consumers without hitting the max limit.
checkpointerMemoryCheckPointer()Checkpointer to use
processorJsonProcessor()Record aggregator/serializer. Must Match processor used by Producer()

检查点

  • 内存(默认但有点无意义)
    MemoryCheckPointer()
  • redis
    RedisCheckPointer(name, session_timeout=60, heartbeat_frequency=15, is_cluster=False)

需要env:

    REDIS_HOST

需要pip install aredis

处理器(聚合器+序列化器)

聚合允许成批处理多个记录以更有效地使用流。 参考https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

ClassAggregatorSerializerDescription
StringProcessorSimpleAggregatorStringSerializerSingle String record
JsonProcessorSimpleAggregatorJsonSerializerSingle JSON record
JsonLineProcessorNewlineAggregatorJsonSerializerMultiple JSON record separated by new line char
MsgpackProcessorNetstringAggregatorMsgpackSerializerMultiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring)

注意,您可以很容易地定义自己的处理器,因为它只是一个继承aggregator+serializer的类。

class MsgpackProcessor(Processor, NetstringAggregator, MsgpackSerializer):
    pass

只需使用serialize()和deserialize()方法定义一个新的序列化程序类。

注意:

  • 如果安装了json,json将使用pip install ujson
  • msgpack要求安装pip install msgpack

基准/示例

有关代码,请参见benchmark.py

5万件约1K(Python)大小的物品,使用单个碎片。

Benchmark

单元测试

使用https://github.com/mhart/kinesalite进行本地测试。

通过Docker运行测试

docker-compose up --abort-on-container-exit --exit-code-from test

用于本地测试

docker-compose up kinesis redis

然后在您的虚拟机中

nosetests

# or run individual test
nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded

注意,有一些测试用例使用actualaws kinesists(awskineists) 这些操作需要设置env才能运行

使用创建“.env”文件

TESTING_USE_AWS_KINESIS=1

注意,如果提交PR,则可以忽略这些测试,除非更改了核心批处理/处理行为。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Cassandra复制因子大于节点数   java J2EE JTA事务回滚不适用于OSE Glassfish 4.0(Build 89)   java spring安全预认证用户登录   org的java类文件。反应流。从RxJava编译示例时未找到Publisher?   java在使用dataFormat作为POJO通过Camel调用Web服务时无法设置SOAP标头   Javafx类的java静态实例   java如何防止一个部件在关闭时覆盖另一个部件的位置   sql server无法从我的java代码连接到数据库   java在JList(Swing)中显示带有的ArrayList   从Java中的CXF服务获取WSAddressing数据   使用资产文件夹进行java简单json解析(本地)   java LDAPException未绑定的无效凭据   JavaJSFspring部署到weblogic   JAVA中字符数组中的特定元素排列?   如果脚本位于不同的目录中,则ant不会使用exec标记运行Javashell脚本