异步运动库

async-kinesis的Python项目详细描述


异步运动

Code style: blackPyPI versionPython 3.6Python 3.6

pip install async-kinesis

功能

  • 为生产者和消费者使用队列
    • 如果有足够的刷新空间或达到“缓冲区时间”后,生产者将使用put_records()刷新
    • 使用者独立于碎片读取器在msg队列上迭代
  • 可配置以处理碎片限制,但如果需要,将限制/重试
    • IE多个独立客户端正在饱和碎片
  • 心跳检查点
    • 死锁+如果检查点在“会话超时”内无法心跳,则重新分配碎片
  • 处理器(聚合器+序列化器)
    • json行分隔,msgpack

有关详细信息,请参见docs/design。 关于为什么要重新发明轮子,请参见docs/yetanother

环境变量

根据BOTO3的要求

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

生产者

from kinesis import Producer

async with Producer(stream_name="test") as producer:
    # Put item onto queue to be flushed via put_records()
    await producer.put({'my': 'data'})

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
buffer_time0.5Buffer time in seconds before auto flushing records
put_rate_limit_per_shard1000"A single shard can ingest up to 1 MiB of data per second (including partition keys) or 1,000 records per second for writes"
put_bandwidth_limit_per_shard1024Kb per sec. max is 1024 per shard (ie 1 MiB). Keep below to minimize ProvisionedThroughputExceeded" errors
batch_size500"Each PutRecords request can support up to 500 records"
max_queue_size10000put() method will block when queue is at max
after_flush_funNoneasync function to call after doing a flush (err put_records()) call
processorJsonProcessor()Record aggregator/serializer. Default is JSON without aggregation. Note this is highly inefficient as each record can be up to 1Mib

消费者

from kinesis import Consumer

async with Consumer(stream_name="test") as consumer:
    while True:
        async for item in consumer:
            print(item)
        # caught up.. take a breather~

选项:

(引号中的注释是根据AWS文档的动态限制)

ArgDefaultDescription
region_nameNoneAWS Region
max_queue_size10000the fetch() task shard will block when queue is at max
max_shard_consumersNoneMax number of shards to use. None = all
record_limit10000Number of records to fetch with get_records()
sleep_time_no_records2No of seconds to sleep when caught up
iterator_typeTRIM_HORIZONDefault shard iterator type for new/unknown shards (ie start from start of stream). Alternative is "LATEST" (ie end of stream)
shard_fetch_rate1No of fetches per second (max = 5). 1 is recommended as allows having multiple consumers without hitting the max limit.
checkpointerMemoryCheckPointer()Checkpointer to use
processorJsonProcessor()Record aggregator/serializer. Must Match processor used by Producer()

检查点

  • 内存(默认但有点无意义)
    MemoryCheckPointer()
  • redis
    RedisCheckPointer(name, session_timeout=60, heartbeat_frequency=15, is_cluster=False)

需要env:

    REDIS_HOST

需要pip install aredis

处理器(聚合器+序列化器)

聚合允许成批处理多个记录以更有效地使用流。 参考https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

ClassAggregatorSerializerDescription
StringProcessorSimpleAggregatorStringSerializerSingle String record
JsonProcessorSimpleAggregatorJsonSerializerSingle JSON record
JsonLineProcessorNewlineAggregatorJsonSerializerMultiple JSON record separated by new line char
MsgpackProcessorNetstringAggregatorMsgpackSerializerMultiple Msgpack record framed with Netstring Protocol (https://en.wikipedia.org/wiki/Netstring)

注意,您可以很容易地定义自己的处理器,因为它只是一个继承aggregator+serializer的类。

class MsgpackProcessor(Processor, NetstringAggregator, MsgpackSerializer):
    pass

只需使用serialize()和deserialize()方法定义一个新的序列化程序类。

注意:

  • 如果安装了json,json将使用pip install ujson
  • msgpack要求安装pip install msgpack

基准/示例

有关代码,请参见benchmark.py

5万件约1K(Python)大小的物品,使用单个碎片。

Benchmark

单元测试

使用https://github.com/mhart/kinesalite进行本地测试。

通过Docker运行测试

docker-compose up --abort-on-container-exit --exit-code-from test

用于本地测试

docker-compose up kinesis redis

然后在您的虚拟机中

nosetests

# or run individual test
nosetests tests.py:KinesisTests.test_create_stream_shard_limit_exceeded

注意,有一些测试用例使用actualaws kinesists(awskineists) 这些操作需要设置env才能运行

使用创建“.env”文件

TESTING_USE_AWS_KINESIS=1

注意,如果提交PR,则可以忽略这些测试,除非更改了核心批处理/处理行为。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java查找事件分派线程冲突   java画布。DrawBitmap()不会在安卓上绘制任何内容!!:(   ruby+appium或java+appium用于移动本机应用程序自动化   java Jersey REST字符编码   java使用json将字符串转换为Arraylist   java如何在Groovy中检查字符串是否与模式匹配   java如何在抽象arraylist中添加抽象arraylist   Java servlets,JSP更改内容od DIV   java在J2ME中通过http发送和接收数据,并处理菜单和屏幕   Jar文件与JAVA类路径的结合   java按钮不可见,它将连接安卓 studio中的另一个活动   java是否可以使用SFTP JSch库进行多部分文件上载?   facelet中ui:composition和ui:decoration的java差异   java得到的数字不能被任何东西除   java logback:SizeAndTimeBasedRollingPolicy不删除包含4位“%i”的文件   java数据库管理器将连接“借用”到数据库   java javaFx标签wrapText不起作用   java是否可以在同一个系统上同时运行两台服务器?或者,当XAMPP/WAMP未运行时,如何在Eclipse中执行MySQL查询?   递归Java编程