我有一个Python类,它使用多处理池来处理和清理大型数据集。完成大部分清理的方法是“dataCleaner”,它需要调用第二个方法“processObservation”。 我对Python多处理非常陌生,我似乎不知道如何确保在生成新进程时从“cleanData”调用“processObservation”方法。我该怎么做?我倾向于将所有这些方法保留在类中。我怀疑这与“调用”定义有关,但不确定如何适当地修改它。在
def processData(self, dataset, num_procs = mp.cpu_count()):
dataSize = len(dataset)
outputDict = dict()
procs = mp.Pool(processes = num_procs, maxtasksperchild = 1)
# Generate data chunks for processing.
chunk = dataSize / num_procs
dataChunk = [(i, i + chunk) for i in range(0, dataSize, chunk)]
count = 1
print 'Number of data chunks %d' %len(dataChunk)
for i in dataChunk:
procs.apply_async(self.dataCleaner, args = (dataset[i[0]:i[1]], count, ))
count += 1
procs.close()
procs.join()
def cleanData(self, data, procNumber):
print 'Spawning new process: %d' %os.getpid()
tempDict = dict()
print len(data)
for obs in data:
key, value = processObservation(obs)
tempDict[key] = value
cPickle.dump(tempDict, open( '../dataMP/cleanedData_' + str(procNumber) + '.p', 'wb'))
def __call__(self, dataset, count):
return self.cleanData(dataset, count)
如果你没有给出可复制的代码或错误,很难判断出b/c发生了什么。在
但是,您的问题很可能是因为您在类内部使用
multiprocessing
。在参见:Using multiprocessing in a class 和Multiprocessing: How to use Pool.map on a function defined in a class?
相关问题 更多 >
编程相关推荐