我使用的是multiprocessing
python模块。我有大约20-25个任务要同时运行。每个任务将创建一个大约20k行的pandas.DataFrame
对象。问题是,所有任务都执行得很好,但当涉及到“加入”过程时,它就停止了。我试过用“小”数据帧,效果很好。为了说明我的观点,我创建了下面的代码。在
import pandas
import multiprocessing as mp
def task(arg, queue):
DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
queue.put(DF)
print("DF %d stored" %arg)
listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]
for p in processes:
p.start()
for i,p in enumerate(processes):
print("joining %d" %i)
p.join()
results = [queue.get() for p in processes]
编辑:
带DF=熊猫.DataFrame({“hello”:range(10)})我都正确:“DF 0 stored”到“DF 19 stored”,与“joining 0”到“joining 19”相同。在
但是DF=熊猫.DataFrame({“hello”:range(1000)}问题出现了:当它存储DF时,连接步骤在“joining 3”之后停止。在
感谢您提供的有用提示:)
此问题在文档中的Pipes and Queues下进行了解释:
使用管理器是可行的,但是有很多更简单的方法来解决这个问题:
Queue
(例如,使用JoinableQueue
和task_done
)。在Pool.map
而不是重新发明轮子。(是的,很多Pool
所做的对你的用例来说是不必要的,但是它也不会妨碍你,好的是,你已经知道它是有效的。)我不会展示#1的实现,因为它太琐碎了,也不会展示#2的实现,因为它太痛苦了,但是对于#3:
(在2.7中,
Pool
在with
语句中可能不起作用;您可以将multiprocessing
的更高版本的端口安装回2.7 off PyPI,或者只需手动创建池,然后close
在try
/finally
中工作,如果文件在with
语句中不起作用,您只需处理一个文件…)你可能会问自己,为什么在这一点上它失败了,但是用更小的数字工作,甚至更小一点?在
这个数据帧的pickle值刚刚超过16K(这个列表本身就小了一点,但是如果你用10000而不是1000来尝试它,那么在没有Pandas的情况下应该会看到相同的结果)
所以,第一个子写16K,然后阻塞,直到有空间写最后几百个字节。但是在},除非你解除对管道的阻止,否则这是一个典型的死锁。有足够的空间让前4个通过,但没有空间让5个通过。因为你有4个核心,大多数时候,前4个通过的将是前4个。但偶尔4会打败3或者其他什么,然后你就不能加入3了。这种情况在8核机器上更常见。在
join
之后才从管道中取出任何东西(通过调用queue.get
),并且在它们退出之前你不能{相关问题 更多 >
编程相关推荐