异步运动库
py-kinesis的Python项目详细描述
异步运动
功能
- 为生产者和消费者使用队列
- 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
- 使用者独立于碎片读取器在msg队列上迭代
- 可配置以处理碎片限制,但如果需要,将限制/重试
- IE多个独立客户端正在饱和碎片
- 心跳检查点
- 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片
消费者设计
(有一些解释,有点复杂~)
- fetch()被周期性地调用(0.2秒(即每秒最多5次,这是对shard get_records()的限制)
- 遍历碎片列表(在启动时设置,当前未检测到重新硬化)
- 如果不使用且不在“最大碎片消费者”限制下,则分配碎片,否则忽略/继续
- 如果此碎片仍在提取,则忽略/继续
- 如果碎片提取完成,则处理记录
- 将记录放入队列
- 将检查点记录添加到队列
- 分配nextsharditerator
- 再次创建(get_records())任务
- 遍历碎片列表(在启动时设置,当前未检测到重新硬化)
注意,get_records()是通过“shard_fetch_rate=5”(即相同的0.2秒/5倍限制)限制的
这种模式似乎是维护消费者群的最简单方法,而无需太费劲地考虑下一个工作或处理新碎片等问题。
未实现
- 重新硬化
- 客户再平衡(即在消费者之间共享碎片)
另请参见
生产者
async with Producer(stream_name="test") as producer:
# Put item onto queue to be flushed via put_records()
await producer.put({'my': 'data'})
选项:
(引号中的注释是根据AWS文档的动态限制)
地区名称
AWS Region
缓冲时间=0.5
Buffer time in seconds before auto flushing records
每碎片的Put_rate_limit_=1000
"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes
批量=500
"Each PutRecords request can support up to 500 records"
最大队列大小=10000
put() method will block when queue is at max
冲水后的乐趣
async function to call after doing a flush (err put_records()) call
消费者
async with Consumer(stream_name="test") as consumer:
while True:
async for item in consumer:
print(item)
# caught up.. take a breather~
选项:
(引号中的注释是根据AWS文档的动态限制)
地区名称
AWS Region
最大队列大小=1000
the fetch() task shard
最大碎片消费者=无
Max number of shards to use. None = all
记录极限=10000
Number of records to fetch with get_records()
睡眠时间无记录=2
No of seconds to sleep when caught up
迭代器type=“trim_horizon”
Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)
碎片提取率=5
No of fetches per second (max = 5)
检查点=无
Checkpointer to use
检查点
- 内存
- redis
又一个python运动库?
遗憾的是,我发现的所有其他库都有问题:(
https://github.com/NerdWalletOSS/kinesis-python
- 专业:
- 有点管用
- 骗局
- 螺纹的
- 解决一些问题的出色公关
- 主线程上每个记录的检查点
- 专业:
https://github.com/ungikim/kinsumer
- 专业:
- 处理碎片更改
- 无制作人
- 没有redis检查点/心跳
- 线程/似乎有点复杂~
- 骗局
- 仅限消费者
- 专业:
https://github.com/bufferapp/kiner
- 专业:
- 批处理
- 骗局
- 仅限制作人
- 专业:
https://github.com/niklio/aiokinesis
- 专业:
- 异步
- 无检查点
- 骗局
- 仅限于1个碎片/过于简单化
- 专业:
https://github.com/ticketea/pynesis
- 专业:
- 检查点
- 骗局
- 已经一年没有更新了
- 不使用Put_Records()
- 单线程/循环读取碎片
- 专业:
https://github.com/whale2/async-kinesis-client
- 专业:
- 检查点
- 异步
- 骗局
- 是吗?
- 专业:
(其实我最近才找到这个,可能是个不错的选择?)