目标是尝试从s3加载大量的“大容量”json。我发现了aiobotocore
,我觉得有必要尝试一下,希望能提高效率,同时熟悉{
问题是boto3一次只支持一个http请求。通过使用Threadpool
我设法得到了显著的改进,但我希望有一个更有效的方法。在
代码如下:
进口:
import os
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE
我在某个地方找到了一个助手生成器,它可以从包含多个json的字符串中返回解码的json。在
^{pr2}$此函数从具有给定前缀的s3 bucket获取密钥:
# Async stuff starts here
async def get_keys(loop, bucket, prefix):
'''Get keys in bucket based on prefix'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = []
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
keys.append(c['Key'])
return keys
此函数用于获取所提供密钥的内容。除此之外,它还将解码内容列表变平:
async def get_object(loop,bucket, key):
'''Get json content from s3 object'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
# get object from s3
response = await client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
这里是一个主函数,它收集所有找到的键的内容并将内容列表展平。在
async def go(loop, bucket, prefix):
'''Returns list of dicts of object contents'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = await get_keys(loop, bucket, prefix)
contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])
return list(chain.from_iterable(contents))
最后,我运行这个命令,dict的结果列表很好地结束在result
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
有一件事我觉得可能有点麻烦,那就是我在每个异步函数中创建一个客户机。也许可以把它拿出来。请注意,aiobotocore
如何与多个客户机一起工作。
此外,我认为在加载键的对象之前,您不需要等待所有键都已加载,我认为在这个实现中就是这样。我假设只要找到一个密钥,就可以调用get_object
。所以,也许它应该是async generator
。但我不完全清楚。
提前谢谢你!希望这对类似情况的人有帮助。在
先退房aioboto3
其次,aiobotore中的每个客户机都与aiohttp会话相关联。每个会话最多可以有max_pool_connections。这就是为什么在basic aiobotocore example中它在
create_client
上执行async with
。因此,当使用完客户端时,池将被关闭。在以下是一些提示:
基于以上所述,我将如何做到:
相关问题 更多 >
编程相关推荐