Python的奇怪行为与多处理连接不执行

2024-05-17 06:25:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是multiprocessingpython模块。我有大约20-25个任务要同时运行。每个任务将创建一个大约20k行的pandas.DataFrame对象。问题是,所有任务都执行得很好,但当涉及到“加入”过程时,它就停止了。我试过用“小”数据帧,效果很好。为了说明我的观点,我创建了下面的代码。在

import pandas
import multiprocessing as mp

def task(arg, queue):
    DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
    queue.put(DF)
    print("DF %d stored" %arg)

listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]

for p in processes:
    p.start()

for i,p in enumerate(processes):
    print("joining %d" %i)
    p.join()

results = [queue.get() for p in processes]

编辑:

DF=熊猫.DataFrame({“hello”:range(10)})我都正确:“DF 0 stored”到“DF 19 stored”,与“joining 0”到“joining 19”相同。在

但是DF=熊猫.DataFrame({“hello”:range(1000)}问题出现了:当它存储DF时,连接步骤在“joining 3”之后停止。在

感谢您提供的有用提示:)


Tags: inimporthellodataframepandasdffortask
1条回答
网友
1楼 · 发布于 2024-05-17 06:25:05

此问题在文档中的Pipes and Queues下进行了解释:

Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

使用管理器是可行的,但是有很多更简单的方法来解决这个问题:

  1. 首先从队列中读取数据,然后加入进程,而不是相反。在
  2. 手动管理Queue(例如,使用JoinableQueuetask_done)。在
  3. 只需使用Pool.map而不是重新发明轮子。(是的,很多Pool所做的对你的用例来说是不必要的,但是它也不会妨碍你,好的是,你已经知道它是有效的。)

我不会展示#1的实现,因为它太琐碎了,也不会展示#2的实现,因为它太痛苦了,但是对于#3:

def task(arg):
    DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
    return DF

with mp.Pool(processes=20) as p:
    results = p.map(task, range(20), chunksize=1)

(在2.7中,Poolwith语句中可能不起作用;您可以将multiprocessing的更高版本的端口安装回2.7 off PyPI,或者只需手动创建池,然后closetry/finally中工作,如果文件在with语句中不起作用,您只需处理一个文件…)


你可能会问自己,为什么在这一点上它失败了,但是用更小的数字工作,甚至更小一点?在

这个数据帧的pickle值刚刚超过16K(这个列表本身就小了一点,但是如果你用10000而不是1000来尝试它,那么在没有Pandas的情况下应该会看到相同的结果)

所以,第一个子写16K,然后阻塞,直到有空间写最后几百个字节。但是在join之后才从管道中取出任何东西(通过调用queue.get),并且在它们退出之前你不能{},除非你解除对管道的阻止,否则这是一个典型的死锁。有足够的空间让前4个通过,但没有空间让5个通过。因为你有4个核心,大多数时候,前4个通过的将是前4个。但偶尔4会打败3或者其他什么,然后你就不能加入3了。这种情况在8核机器上更常见。在

相关问题 更多 >