异步运动库

py-kinesis的Python项目详细描述


异步运动

Code style: black

功能

  • 为生产者和消费者使用队列
    • 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
    • 使用者独立于碎片读取器在msg队列上迭代
  • 可配置以处理碎片限制,但如果需要,将限制/重试
    • IE多个独立客户端正在饱和碎片
  • 心跳检查点
    • 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片

消费者设计

(有一些解释,有点复杂~)

  • fetch()被周期性地调用(0.2秒(即每秒最多5次,这是对shard get_records()的限制)
    • 遍历碎片列表(在启动时设置,当前未检测到重新硬化)
      • 如果不使用且不在“最大碎片消费者”限制下,则分配碎片,否则忽略/继续
      • 如果此碎片仍在提取,则忽略/继续
      • 如果碎片提取完成,则处理记录
        • 将记录放入队列
        • 将检查点记录添加到队列
        • 分配nextsharditerator
      • 再次创建(get_records())任务

注意,get_records()是通过“shard_fetch_rate=5”(即相同的0.2秒/5倍限制)限制的

这种模式似乎是维护消费者群的最简单方法,而无需太费劲地考虑下一个工作或处理新碎片等问题。

未实现

  • 重新硬化
  • 客户再平衡(即在消费者之间共享碎片)

另请参见

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

生产者

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

选项:

(引号中的注释是根据AWS文档的动态限制)

  • 地区名称

    AWS Region

  • 缓冲时间=0.5

    Buffer time in seconds before auto flushing records

  • 每碎片的Put_rate_limit_=1000

    "A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes

  • 批量=500

    "Each PutRecords request can support up to 500 records"

  • 最大队列大小=10000

    put() method will block when queue is at max

  • 冲水后的乐趣

    async function to call after doing a flush (err put_records()) call

消费者

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

选项:

(引号中的注释是根据AWS文档的动态限制)

  • 地区名称

    AWS Region

  • 最大队列大小=1000

    the fetch() task shard

  • 最大碎片消费者=无

    Max number of shards to use. None = all

  • 记录极限=10000

    Number of records to fetch with get_records()

  • 睡眠时间无记录=2

    No of seconds to sleep when caught up

  • 迭代器type=“trim_horizon”

    Default shard iterator type for new/unknown shards (ie start from start of stream) Alternative is "LATEST" (ie end of stream)

  • 碎片提取率=5

    No of fetches per second (max = 5)

  • 检查点=无

    Checkpointer to use

检查点

  • 内存
  • redis

又一个python运动库?

遗憾的是,我发现的所有其他库都有问题:(

(其实我最近才找到这个,可能是个不错的选择?)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java FloatingAction按钮与RecyclerView中的CardView重叠   java如何计算CardLayout中的卡数   从远程系统上传MySQL数据库并访问Java应用程序   java调用堆栈如何处理带或不带返回类型的递归?   Springboot中的java组计数聚集   java如何在javafx textarea中使用richtextfx   获取与Mockito相关的错误时出现Java问题   java如何将JaxRS响应转换为Wiremock响应   Hadoop集群java。net ConnectionException:连接被拒绝错误   java如何加载文件私有文件类型是pem   java在元空间中的提升和加载的类   如何将系统属性传递给从HTML启动的Java小程序   java如何从网页中获取值并在主类中使用它?安卓应用   java在春天,advisor和aspect之间有什么区别?   java如何检测文件是否已重命名?   java消息驱动Bean何时使用