异步运动库
async-kinesis的Python项目详细描述
异步运动
pip install async-kinesis
功能
- 为生产者和消费者使用队列
- 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
- 使用者独立于碎片读取器在msg队列上迭代
- 可配置以处理碎片限制,但如果需要,将限制/重试
- IE多个独立客户端正在饱和碎片
- 心跳检查点
- 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片
- 处理器(聚合器+序列化器)
- json行分隔,msgpack
有关详细信息,请参见docs/design。 关于为什么要重新发明轮子,请参见docs/yetanother。
环境变量
根据BOTO3的要求
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
生产者
from kinesis import Producer
async with Producer(stream_name="test") as producer:
# Put item onto queue to be flushed via put_records()
await producer.put({'my': 'data'})
选项:
(引号中的注释是根据AWS文档的动态限制)
Arg | Default | Description |
---|---|---|
region_name | None | AWS Region |
buffer_time | 0.5 | Buffer time in seconds before auto flushing records |
put_rate_limit_per_shard | 1000 | "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes" |
put_bandwidth_limit_per_shard | 1024 | Kb per sec. max is 1024 per shard (ie 1 MiB). Keep below to minimize ProvisionedThroughputExceeded" errors |
batch_size | 500 | "Each PutRecords request can support up to 500 records" |
max_queue_size | 10000 | put() method will block when queue is at max |
after_flush_fun | None | async function to call after doing a flush (err put_records()) call |
processor | JsonProcessor() | Record aggregator/serializer. Default is JSON without aggregation. Note this is highly inefficient as each record can be up to 1Mib |
消费者
from kinesis import Consumer
async with Consumer(stream_name="test") as consumer:
while True:
async for item in consumer:
print(item)
# caught up.. take a breather~
选项:
(引号中的注释是根据AWS文档的动态限制)
Arg | Default | Description |
---|---|---|
region_name | None | AWS Region |
max_queue_size | 10000 | the fetch() task shard will block when queue is at max |
max_shard_consumers | None | Max number of shards to use. None = all |
record_limit | 10000 | Number of records to fetch with get_records() |
sleep_time_no_records | 2 | No of seconds to sleep when caught up |
iterator_type | TRIM_HORIZON | Default shard iterator type for new/unknown shards (ie start from start of stream). Alternative is "LATEST" (ie end of stream) |
shard_fetch_rate | 1 | No of fetches per second (max = 5). 1 is recommended as allows having multiple consumers without hitting the max limit. |
checkpointer | MemoryCheckPointer() | Checkpointer to use |
processor | JsonProcessor() | Record aggregator/serializer. Must Match processor used by Producer() |
检查点
- 内存(默认但有点无意义)
MemoryCheckPointer()
- redis
RedisCheckPointer(name, session_timeout=60, heartbeat_frequency=15, is_cluster=False)
需要env:
REDIS_HOST
需要pip install aredis
处理器(聚合器+序列化器)
聚合允许成批处理多个记录以更有效地使用流。 参考https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/
Class | Aggregator | Serializer | Description |
---|---|---|---|
StringProcessor | SimpleAggregator | StringSerializer | Single String record |
JsonProcessor | SimpleAggregator | JsonSerializer | Single JSON record |
JsonLineProcessor | NewlineAggregator | JsonSerializer | Multiple JSON record separated by new line char |
MsgpackProcessor | NetstringAggregator | MsgpackSerializer | Multiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring) |
注意,您可以很容易地定义自己的处理器,因为它只是一个继承aggregator+serializer的类。
class MsgpackProcessor(Processor, NetstringAggregator, MsgpackSerializer):
pass
只需使用serialize()和deserialize()方法定义一个新的序列化程序类。
注意:
- 如果安装了json,json将使用
pip install ujson
- msgpack要求安装
pip install msgpack
基准/示例
有关代码,请参见benchmark.py。
5万件约1K(Python)大小的物品,使用单个碎片。
单元测试
使用https://github.com/mhart/kinesalite进行本地测试。
通过Docker运行测试
docker-compose up --abort-on-container-exit --exit-code-from test
用于本地测试
docker-compose up kinesis redis
然后在您的虚拟机中
nosetests
# or run individual test
nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded
注意,有一些测试用例使用actualaws kinesists(awskineists) 这些操作需要设置env才能运行
使用创建“.env”文件
TESTING_USE_AWS_KINESIS=1
注意,如果提交PR,则可以忽略这些测试,除非更改了核心批处理/处理行为。