异步运动库

py-kinesis的Python项目详细描述


异步运动

Code style: black

功能

  • 为生产者和消费者使用队列
    • 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
    • 使用者独立于碎片读取器在msg队列上迭代
  • 可配置以处理碎片限制,但如果需要,将限制/重试
    • IE多个独立客户端正在饱和碎片
  • 心跳检查点
    • 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片

消费者设计

(有一些解释,有点复杂~)

  • fetch()被周期性地调用(0.2秒(即每秒最多5次,这是对shard get_records()的限制)
    • 遍历碎片列表(在启动时设置,当前未检测到重新硬化)
      • 如果不使用且不在“最大碎片消费者”限制下,则分配碎片,否则忽略/继续
      • 如果此碎片仍在提取,则忽略/继续
      • 如果碎片提取完成,则处理记录
        • 将记录放入队列
        • 将检查点记录添加到队列
        • 分配nextsharditerator
      • 再次创建(get_records())任务

注意,get_records()是通过“shard_fetch_rate=5”(即相同的0.2秒/5倍限制)限制的

这种模式似乎是维护消费者群的最简单方法,而无需太费劲地考虑下一个工作或处理新碎片等问题。

未实现

  • 重新硬化
  • 客户再平衡(即在消费者之间共享碎片)

另请参见

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

生产者

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

选项:

(引号中的注释是根据AWS文档的动态限制)

  • 地区名称

    AWS Region

  • 缓冲时间=0.5

    Buffer time in seconds before auto flushing records

  • 每碎片的Put_rate_limit_=1000

    "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes

  • 批量=500

    "Each PutRecords request can support up to 500 records"

  • 最大队列大小=10000

    put() method will block when queue is at max

  • 冲水后的乐趣

    async function to call after doing a flush (err put_records()) call

消费者

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

选项:

(引号中的注释是根据AWS文档的动态限制)

  • 地区名称

    AWS Region

  • 最大队列大小=1000

    the fetch() task shard

  • 最大碎片消费者=无

    Max number of shards to use. None = all

  • 记录极限=10000

    Number of records to fetch with get_records()

  • 睡眠时间无记录=2

    No of seconds to sleep when caught up

  • 迭代器type=“trim_horizon”

    Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)

  • 碎片提取率=5

    No of fetches per second (max = 5)

  • 检查点=无

    Checkpointer to use

检查点

  • 内存
  • redis

又一个python运动库?

遗憾的是,我发现的所有其他库都有问题:(

(其实我最近才找到这个,可能是个不错的选择?)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
在OSGI中使用cxf生成的客户端时出现Java类装入器问题和JaxB异常   java为什么要在javamail中迭代多部分电子邮件中的部分?   并发编程问题   JFileChooser&&System中未调用java windowClosing。退出功能不正常?   SQL查询的java语法分析   java如何使用AspectJ声明字段上的警告   什么是java向量。元素()C#等价物   java解析Android应用程序中tornado web服务中的CSV文件   java我试过c2dm,我需要服务器端   java调整JPanel大小以适应新的JLabel图标   Java与Python脚本的通信   java使用Saxon通过XSLT生成URL   java net::ERR_complete_CHUNKED_编码200(OK)来自struts应用程序中的tomcat   java如何为我的窗格设置不同的位置?   java使用Cypher Neo4j获取给定类型的所有节点(从SQL世界中的tablename中选择*)   nio使用Java解析文件值   java使用WSDL生成REST客户端会是错误的方向吗?   java如何在我的应用程序中构建类映射?   java按钮。setEnabled在第一个循环中不起作用   xPath适用于最后一页,但不适用于第一页