基于低层次、多处理的aws动态生产者和消费者库

kinesis-python的Python项目详细描述


https://img.shields.io/travis/NerdWalletOSS/kinesis-python.svghttps://img.shields.io/codecov/c/github/NerdWalletOSS/kinesis-python.svgLatest PyPI version

official Kinesis python library需要使用amazon的“multilangdaemon”,这是一个java可执行文件 通过stdin/stdout上的管道消息进行操作。

ಠ_ಠ

从维护的角度来看,希望有一个客户机库的单一实现是有意义的 负责kpl的团队,要求安装jre,并且必须考虑 java和python使用的流对于在没有java的环境中工作的团队来说是不可取的。

这是一个纯粹的python实现,它是利用python的多处理的kinisis producer和consumer类 模块为每个碎片生成一个进程,然后通过队列将消息发送回主进程。这只取决于 关于boto3(aws sdk)、offspring(子流程实现)和six(py2/py3兼容性)。

它还包括一个dynamodb状态后端,允许多个碎片的多实例使用,并存储 检查点数据,以便您可以在重新启动或崩溃后恢复流中中断的位置。

概述

所有功能都包装在两个类中:KinesisConsumerKinesisProducer

消费者

使用者通过启动流中每个shard的进程,然后实现python迭代器协议来工作。

fromkinesis.consumerimportKinesisConsumerconsumer=KinesisConsumer(stream_name='my-stream')formessageinconsumer:print"Received message: {0}".format(message)

从每个shard进程接收的消息通过python队列传递回主进程 为处理而产生。信息没有严格的顺序,但这是动觉的属性,而不是这个 实施。

锁定、检查点和多实例消耗

当部署具有多个实例的应用程序时,可以利用dynamodb来协调哪个实例 负责哪个碎片,因为不希望每个实例处理所有记录。

无论是否有多个节点,在处理记录时都需要检查流,以便 如果重新启动消费者,则从停止的位置取货。

利用dynamodb的“state”后端允许用户协调哪个节点负责哪个碎片和 在我们目前正在读的数据流中。

fromkinesis.consumerimportKinesisConsumerfromkinesis.stateimportDynamoDBconsumer=KinesisConsumer(stream_name='my-stream',state=DynamoDB(table_name='my-kinesis-state'))formessageinconsumer:print"Received message: {0}".format(message)

TynDoDB表必须已经存在,并且必须具有^ {TT3}$ $ ^ {TT4}$,类型^ {TT5}$(String)。

生产者

制作人通过启动一个单一的流程来进行积累并发布到流中。

fromkinesis.producerimportKinesisProducerproducer=KinesisProducer(stream_name='my-stream')producer.put('Hello World from Python')

默认情况下,累积缓冲区时间为500ms,或最大记录大小为1MB,以先发生者为准。你可以 通过以秒为单位指定的buffer_timekwarg实例化生产者时更改缓冲区时间。为了 例如,如果您主要关心的是预算而不是性能,您可以在60秒的时间内累积。

producer=KinesisProducer(stream_name='my-stream',buffer_time=60)

后台进程采取预防措施,以确保在 关机时间通过信号处理程序和Python AT出口模块,但它不是完全持久的,如果你是 向生产者进程发送kill -9,任何累积的消息都将丢失。

AWS权限

默认情况下,producer、consumer&state类都使用默认的boto3 credentials chain。如果你想改变 您可以实例化自己的boto3.Session对象,并通过^{tt9}将其传递到构造函数中。$ KinesisProducerKinesisConsumerDynamoDB的关键字参数。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java是否可以将一半的文本绘制成不同的颜色?   java如何在Eclipse中生成Javadoc HTML文件?   带有时间戳的java“select”preparedStatement返回始终为空的记录集   java将无符号类型写入Netty ChannelBuffer   java动态资源名称   java lookupDefaultPrintService()不返回系统默认打印机   java如何在播放模板中翻译#{get'title'/}?   java为什么JSR352的ItemWriter接口中有一个checkpointInfo?有任何示例实现吗?   在方法外部声明的Java引用变量存在于堆栈或堆上   java类、异常、用户输入   java JApplet通过Eclipse而不是web浏览器连接到本地主机Mysql   java soap metro转储