池工作人员无法完成所有任务

2024-10-01 02:22:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个相对简单的python多处理脚本,它设置一个worker池,通过自定义管理器将输出附加到pandasdataframe。我发现,当我对池调用close()/join()时,apply_async提交的所有任务都没有完成。在

下面是一个简化的示例,它提交了1000个作业,但只完成了一半,从而导致断言错误。我是否忽略了一些非常简单的东西,或者这可能是一个bug?在

from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool

class DataFrameResults:
    def __init__(self):
        self.results = DataFrame(columns=("A", "B")) 

    def get_count(self):
        return self.results["A"].count()

    def register_result(self, a, b):
        self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)

class MyManager(BaseManager): pass

MyManager.register('DataFrameResults', DataFrameResults)

def f1(results, a, b):
    results.register_result(a, b)

def main():
    manager = MyManager()
    manager.start()
    results = manager.DataFrameResults()

    pool = Pool(processes=4)

    for (i) in range(0, 1000):
        pool.apply_async(f1, [results, i, i*i])
    pool.close()
    pool.join()

    print results.get_count()
    assert results.get_count() == 1000

if __name__ == "__main__":
    main()

Tags: selfregisterclosegetasyncmaindefcount
1条回答
网友
1楼 · 发布于 2024-10-01 02:22:26

[编辑]您看到的问题是因为以下代码:

self.results = self.results.append(...)

这不是原子。因此,在某些情况下,线程在读取self.results(或在附加时)后中断,但在它可以将新帧分配给self.results>;此实例将丢失。在

正确的解决方案是等待使用results对象来获得结果,然后将它们全部追加到主线程中。在

相关问题 更多 >