上载blob函数中的Azure python异步IO内存耗尽

2024-06-14 20:27:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个asyncio python azure脚本,它使用多个任务将文件从asyncio队列上载到Blob。它工作正常,至少直到耗尽系统上所有可用内存为止。我不知道内存泄漏在哪里。通常我使用内存分析器,但这似乎不适用于异步函数

有人能告诉我我做错了什么,或者最好的办法是找出问题所在吗?谢谢我不清楚什么地方没有清理,如果有的话

我在工作队列中放置了几百到几千个文件,通常执行3-5个任务。在几分钟的时间内,这个程序会消耗掉3到6GB的驻留内存,然后开始消耗swap,直到它运行足够长的时间,内存不足导致它死亡。这是在一个使用Python 3.6.8和以下azure库的8GB内存的linux设备上:

azure common 1.1.25
azure core 1.3.0
azure identity 1.3.0
azure nspkg 3.0.2
azure存储blob 12.3.0

from azure.identity.aio import ClientSecretCredential
from azure.storage.blob.aio import BlobClient

async def uploadBlobsTask(taskName, args, workQueue):
    while not workQueue.empty():
        fileName = await workQueue.get()
        blobName = fileName.replace(args.sourceDirPrefix, '')

        blobClient = BlobClient(
            "https://{}.blob.core.windows.net".format(args.accountName),
            credential = args.creds,
            container_name = args.container,
            blob_name = blobName,
        )

        async with blobClient:
            args.logger.info("Task {}: uploading {} as {}".format(taskName, fileName, blobName))
            try:
                with open(fileName, "rb") as data:
                    await blobClient.upload_blob(data, overwrite=True)
                fileNameMoved = fileName + '.moved'
                with open(fileNameMoved, "w") as fm:
                    fm.write("")
            except KeyboardInterrupt:
                raise
            except:
                args.logger.error("Task {}: {}".format(taskName, traceback.format_exc()))
                await workQueue.put(fileName)
            finally:
                workQueue.task_done()


async def processFiles(args):
    workQueue = asyncio.Queue()

    for (path, dirs, files) in os.walk(args.sourceDir):
        for f in files:
            fileName = os.path.join(path, f)                               
            await workQueue.put(fileName)

    creds = ClientSecretCredential(args.tenant, args.appId, args.password)
    args.creds = creds
    tasks = [ args.loop.create_task(uploadBlobsTask(str(i), args, workQueue)) for i in range(1, args.tasks+1) ]
    await asyncio.gather(*tasks)
    await creds.close()


loop = asyncio.get_event_loop()
args.loop = loop
loop.run_until_complete(processFiles(args))
loop.close()


Tags: 内存loopasyncioformatasyncargsawaitfilename
1条回答
网友
1楼 · 发布于 2024-06-14 20:27:14

不管它值多少钱,我似乎已经设法解决了这个问题,这样它就可以在没有内存泄漏的情况下工作。为此,我获取了一个containerClient,然后从中获取BlobClient(即containerClient.get_blob_client()),而不是直接获取BlobClient对象。现在,总体内存使用率处于非常低的水平,而不是像以前那样持续增长

相关问题 更多 >