我对Python非常陌生,对并行处理也完全陌生。在
我一直在编写代码来分析斑点图像数据(想想PALMlite),并试图使用multiprocessing
模块加快我的分析代码。在
对于小型数据集,我看到相当可观的速度提高到四个核心。对于大型数据集,我开始得到一个断言错误。我试图得出一个同样的错误:
import numpy as np
import multiprocessing as mp
import os
class TestClass(object):
def __init__(self, data):
super().__init__()
self.data = data
def top_level_function(self, nproc = 1):
if nproc > os.cpu_count():
nproc = os.cpu_count()
if nproc == 1:
sums = [self._sub_function() for i in range(10)]
elif 1 < nproc:
print('multiprocessing engaged with {} cores'.format(nproc))
with mp.Pool(nproc) as p:
sums = [p.apply_async(self._sub_function) for i in range(10)]
sums = [pp.get() for pp in sums]
self.sums = sums
return sums
def _sub_function(self):
return self.data.sum(0)
if __name__ == "__main__":
t = TestClass(np.zeros((126,512,512)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
t = TestClass(np.zeros((126,2048,2048)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
哪些输出:
^{pr2}$所以第一个例子运行得很好,但是后一个例子(更大的数据集)崩溃了。在
我对这个错误是从哪里来的以及如何修正它感到迷茫。任何帮助都将不胜感激。在
当你这么做的时候
发生的情况是,
self._sub_function
将被pickle 10次并发送到一个worker进程进行处理。要pickle一个实例方法,必须对整个实例(包括data
属性)进行pickle。快速检查显示,pickled时np.zeros((126,2048,2048))
需要4227858596个字节,而您要发送10倍的字节到10个不同的进程。在您在
_send_bytes
期间遇到一个错误,这意味着到工作进程的传输被中断,我猜是因为您达到了内存限制。在您可能应该重新考虑您的设计,如果每个工作人员都可以处理部分问题而不需要访问整个数据,那么多处理通常是最有效的。在
相关问题 更多 >
编程相关推荐