Asyncio不会在Python中同时运行所有任务。正在尝试批量导出csv文件

2024-10-04 09:27:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试读取一个大文件,并使用Asyncio导出成批CSV文件。 我知道Asyncio不支持同一个文件的Asyncio,所以我尝试根据给定批号的每个任务导出到单个文件。但它只是同步运行

我有main.py,它有一个函数def start()

def start():
    asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))

我有processing.py并有一个函数test_async()

async def test_async(dictRunData):
  num_logical_cpus = multiprocessing.cpu_count()
  with open(dictRunData['input_file'], 'r') as infile:
    content = infile.read().replace('\n', '')
    lstcontent = ast.literal_eval(content)

  tasks = []
  chunkNum = 0
  chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
  print(f"number of chunks: {len(chunk_contents)}")
  for chunk in chunk_contents:
    chunkNum += 1
    task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
    tasks.append(task)

  result = await asyncio.gather(*tasks, return_exceptions=True)

下面是给定块的函数过程

async def process_chunk_async(chunk, chunkNum, dictRunData):
    dict_results = {}
    for data in chunk:
       ..do something..
       dict_results.append(data)

    outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)

这里是write_chunk_async

async def write_chunk_async(dict_results, chunkNum, dictRunData):
    fileName = f"_{chunkNum}.csv"
    wrtieFileTo = open(fileName,"a+")

    for data in dict_results.keys():
        wrtieFileTo.write(data + "\n")

    wrtieFileTo.close()

    print(f"Done write_chunk_async file: {fileName}")

Tags: 文件函数testasynciodataasyncdefresults
1条回答
网友
1楼 · 发布于 2024-10-04 09:27:56

asyncio仅在使用其API进行异步I/O时才提供并发性。在示例代码中,所有I/O(读取/写入文件)都是使用同步的阻塞API完成的,因此使用asyncio不会增加任何值。现在,asyncio实际上没有为异步读/写文件提供任何API,因为它在操作系统级别没有得到很好的支持。请参见Python wiki中的this explanation

有一个第三方库^{},它为文件I/O提供了一个asyncio友好的API,但它只是将所有工作都委托给后台线程,所以没有理由使用它,因为您没有尝试将文件I/O集成到已经使用asyncio的应用程序中。如果应用程序所做的只是读/写文件,那么直接使用线程即可。但是请记住,如果所有线程都在将文件读/写到同一个磁盘,那么多线程也可能没有多大帮助,因为最终所有线程都会在试图访问单个磁盘时相互竞争

相关问题 更多 >