我正在尝试读取一个大文件,并使用Asyncio导出成批CSV文件。 我知道Asyncio不支持同一个文件的Asyncio,所以我尝试根据给定批号的每个任务导出到单个文件。但它只是同步运行
我有main.py,它有一个函数def start()
def start():
asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))
我有processing.py并有一个函数test_async()
async def test_async(dictRunData):
num_logical_cpus = multiprocessing.cpu_count()
with open(dictRunData['input_file'], 'r') as infile:
content = infile.read().replace('\n', '')
lstcontent = ast.literal_eval(content)
tasks = []
chunkNum = 0
chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
print(f"number of chunks: {len(chunk_contents)}")
for chunk in chunk_contents:
chunkNum += 1
task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
tasks.append(task)
result = await asyncio.gather(*tasks, return_exceptions=True)
下面是给定块的函数过程
async def process_chunk_async(chunk, chunkNum, dictRunData):
dict_results = {}
for data in chunk:
..do something..
dict_results.append(data)
outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)
这里是write_chunk_async
async def write_chunk_async(dict_results, chunkNum, dictRunData):
fileName = f"_{chunkNum}.csv"
wrtieFileTo = open(fileName,"a+")
for data in dict_results.keys():
wrtieFileTo.write(data + "\n")
wrtieFileTo.close()
print(f"Done write_chunk_async file: {fileName}")
asyncio
仅在使用其API进行异步I/O时才提供并发性。在示例代码中,所有I/O(读取/写入文件)都是使用同步的阻塞API完成的,因此使用asyncio
不会增加任何值。现在,asyncio
实际上没有为异步读/写文件提供任何API,因为它在操作系统级别没有得到很好的支持。请参见Python wiki中的this explanation有一个第三方库^{} ,它为文件I/O提供了一个
asyncio
友好的API,但它只是将所有工作都委托给后台线程,所以没有理由使用它,因为您没有尝试将文件I/O集成到已经使用asyncio
的应用程序中。如果应用程序所做的只是读/写文件,那么直接使用线程即可。但是请记住,如果所有线程都在将文件读/写到同一个磁盘,那么多线程也可能没有多大帮助,因为最终所有线程都会在试图访问单个磁盘时相互竞争相关问题 更多 >
编程相关推荐