用于aws kinisis的异步python客户端

async-kinesis-client的Python项目详细描述


异步运动客户端

使用异步的python kinisis客户端库

基于evan borgstromeborgstrom@nerdwallet.com的kinisis python项目 https://github.com/NerdWalletOSS/kinesis-python但是使用异步魔术

kinisis python的问题是所有的数据最终都在一个线程中 从那里被检查-所以尽管有许多进程,客户端 被检查点阻塞了。此外,它检查每个记录,这是 不可配置。

这个客户机基于aioboto3库,使用python 3.6+异步方法。

用法:

importasynciofromasync_kinesis_client.kinesis_consumerimportAsyncKinesisConsumerasyncdefread_stream():# This is a coroutine that reads all the records from a shardasyncdefread_records(shard_reader):asyncforrecordsinshard_reader.get_records():forrinrecords:print('Shard: {}; Record: {}'.format(shard_reader.shard_id,r))consumer=AsyncKinesisConsumer(stream_name='my-stream',checkpoint_table='my-checkpoint-table')# consumer will yield existing shards and will continue yielding# new shards if re-sharding happens             asyncforshard_readerinconsumer.get_shard_readers():print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))asyncio.ensure_future(read_records(shard_reader))asyncio.get_event_loop().run_until_complete(read_stream())

asyncSharDreaderasyncKinesisConsumer可以通过调用stop()方法从并行协程中停止, 在这种情况下,消费者将停止所有碎片读取器。 如果希望收到关闭碎片的通知,请在读取记录时捕获shardclosedexception

asyncshardreader在最新版本后面公开millis属性,这可能有助于确定应用程序性能。

asynckinesisconsumer具有以下配置方法:

set_checkpoint_interval(records)-检查点之前要跳过多少记录

set_lock_duration(time)-保留锁的秒数。消费者将在该时间之前尝试刷新锁

set_reader_sleep_time(time)-如果shard reader没有从动觉流接收到任何记录,它应该等待多长时间(以秒为单位,可能是小数)

set_checkpoint_callback(coro)-在检查下一批记录之前设置要调用的回调协程。协程参数:shardidsequenceNumber

制作人很琐碎:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)awaitproducer.put_record(record=b'bytes',partition_key='string',# optional, if none, default time-based key is usedexplicit_hash_key='string'# optional)

一次发送多条记录:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)records=[{'Data':b'bytes','PartitionKey':'string',# optional, if none, default time-based key is used'ExplicitHashKey':'string'# optional},...]response=awaitproducer.put_records(records=records)# See boto3 docs for response structure:# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records

AWS认证。对于在aws云外进行测试,特别是在使用多因素身份验证时,我发现以下代码片段非常有用:

importosimportaioboto3frombotocoreimportcredentialsfromaiobotocoreimportAioSessionworking_dir=os.path.join(os.path.expanduser('~'),'.aws/cli/cache')session=AioSession(profile=os.environ.get('AWS_PROFILE'))provider=session.get_component('credential_provider').get_provider('assume-role')provider.cache=credentials.JSONFileCache(working_dir)aioboto3.setup_default_session(botocore_session=session)

这允许在完成awsudo下的任何aws命令后重新使用缓存的会话令牌,您只需要设置aws_profile环境变量。

目前,库的测试仍然不足以应对不同的网络事件。 有人警告过你,你自己去冒险吧。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何为ConcurrentHashMap使用并设置适当的并发级别?   java泛型方法,运行时错误,   java在页面上显示加载的图像   java Paypal定期直接支付问题   java如何延迟重新绘制组件   JavaSpringBoot+Hibernate如何维护@Transient字段   java在其方法中获取关于类的信息   在java中将别名添加到枚举   java如何解决向google报告成绩时“需要重新连接客户端”的问题   清晰的java图像背景   java未找到适合JDateChooser的构造函数(字符串、字符串、字符)   java LRU缓存实现。某些测试用例的代码失败   if语句Java嵌套的if/Else条件   java JSoup“wrap”并非每次都按预期工作   Java Spring引导循环依赖于一个环境   ssl证书无法通过Java和IntelliJ连接到SOAP服务   带整数验证的Java扫描器   java在Flex中呈现具有动态列的datagrid   java Android:通过用户选择的选项将文件上载到服务器   子类中的java抛出错误、异常和运行时异常