从s3异步读取多个“大容量”json。有更好的方法吗?

2024-09-27 20:14:50 发布

您现在位置:Python中文网/ 问答频道 /正文

目标是尝试从s3加载大量的“大容量”json。我发现了aiobotocore,我觉得有必要尝试一下,希望能提高效率,同时熟悉{}。我试了一下,它很管用,但我基本上知道关于异步编程的nada。因此,我希望得到一些改进/意见。也许有一些善良的灵魂可以发现一些明显的错误。在

问题是boto3一次只支持一个http请求。通过使用Threadpool我设法得到了显著的改进,但我希望有一个更有效的方法。在

代码如下:

进口:

import os 
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE

我在某个地方找到了一个助手生成器,它可以从包含多个json的字符串中返回解码的json。在

^{pr2}$

此函数从具有给定前缀的s3 bucket获取密钥:

# Async stuff starts here
async def get_keys(loop, bucket, prefix):
    '''Get keys in bucket based on prefix'''

    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
        keys = []
        # list s3 objects using paginator
        paginator = client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for c in result.get('Contents', []):
                keys.append(c['Key'])
        return keys

此函数用于获取所提供密钥的内容。除此之外,它还将解码内容列表变平:

async def get_object(loop,bucket, key):
    '''Get json content from s3 object'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:


        # get object from s3
        response = await client.get_object(Bucket=bucket, Key=key)
        async with response['Body'] as stream:
            content = await stream.read()    

    return list(iterload(content.decode()))       

这里是一个主函数,它收集所有找到的键的内容并将内容列表展平。在

async def go(loop, bucket, prefix):
    '''Returns list of dicts of object contents'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:

        keys = await get_keys(loop, bucket, prefix)

        contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])     

        return list(chain.from_iterable(contents))

最后,我运行这个命令,dict的结果列表很好地结束在result

loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
  • 有一件事我觉得可能有点麻烦,那就是我在每个异步函数中创建一个客户机。也许可以把它拿出来。请注意,aiobotocore如何与多个客户机一起工作。

  • 此外,我认为在加载键的对象之前,您不需要等待所有键都已加载,我认为在这个实现中就是这样。我假设只要找到一个密钥,就可以调用get_object。所以,也许它应该是async generator。但我不完全清楚。

提前谢谢你!希望这对类似情况的人有帮助。在


Tags: keyimportclientawsloopjsongetasync
1条回答
网友
1楼 · 发布于 2024-09-27 20:14:50

先退房aioboto3

其次,aiobotore中的每个客户机都与aiohttp会话相关联。每个会话最多可以有max_pool_connections。这就是为什么在basic aiobotocore example中它在create_client上执行async with。因此,当使用完客户端时,池将被关闭。在

以下是一些提示:

  1. {a4应该避免污染事件。当使用此功能时,请将工作流视为流。在
  2. 这样可以避免你不得不使用异步.gather,这将在引发第一个异常后让任务在后台运行。在
  3. 您应该同时调整工作循环大小和max_pool_连接,并且只使用一个客户机,该客户机具有您希望(或基于所需计算)支持的任务数。在
  4. 你真的不需要传递循环,因为在现代python版本中,每个线程只有一个循环
  5. 您应该使用aws profiles(profile param to Session init)/environment variables,这样就不需要硬编码密钥和区域信息了。在

基于以上所述,我将如何做到:

import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    # helper for parsing individual jsons from string of jsons (stolen from somewhere)
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()


async def get_object(s3_client, bucket: str, key: str):
    # Get json content from s3 object

    # get object from s3
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()

    return list(iterload(content.decode()))


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of dicts of object contents
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(get_object, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key']))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))

相关问题 更多 >

    热门问题