processpoolexecutor子进程突然停止

2024-09-28 01:31:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个代码,可以从gstorage下载文件,将文件转储到json,将json转换为csv,然后转换为parquet,最后上传到awss3(不要问为什么我不是编写它的人)

我从日志中发现,有时运行结束时所有子流程都没有完成。 有人知道为什么会这样吗?如果不是的话,你认为可能会改变吗 ProcessPoolExecutor加一个普通的{}会有帮助吗

在我开始这整件事的主要步骤中,我使用:

with ProcessPoolExecutor(max_workers=NUM_OF_PROCESS_WORKERS) as process_executor:
    for table_type in TABLES_COLUMNS_MAPPER.keys():
        for node in nodes:
            process_executor.submit(handle_sstable_group_files_per_node, node, table_type)

我正在使用ubuntu,如果有帮助的话

谢谢


Tags: 文件csv代码innodejsonfortype
1条回答
网友
1楼 · 发布于 2024-09-28 01:31:29

因此,要继续评论,您仍然不会等待结果,因为您使用的是上下文管理器,异常将被“吞没”:

test.py:

from concurrent.futures import ProcessPoolExecutor


def worker(i):

    if i == 3:
        raise Exception(f"ERROR: {i}")

    print(f"TASK: {i}")

    return i * i


def main():
    futures = []

    with ProcessPoolExecutor() as executor:
        for i in range(10):
            futures.append(executor.submit(worker, i))

        # for future in futures:
        #     print(future.result())


if __name__ == "__main__":
    main()

测试:

$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
TASK: 6
TASK: 8
TASK: 7
TASK: 9

现在,当您取消注释这两行时:

for future in futures:
    print(future.result())

您现在可以看到错误(假设您不在worker函数中处理错误):

$ python test.py
TASK: 0
TASK: 1
TASK: 2
TASK: 4
0
1
4
TASK: 8
TASK: 6
TASK: 7
TASK: 9
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "test.py", line 8, in worker
    raise Exception(f"ERROR: {i}")
Exception: ERROR: 3
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "f.py", line 30, in <module>
    main()
  File "test.py", line 25, in main
    print(future.result())
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
Exception: ERROR: 3

相关问题 更多 >

    热门问题