我试图通过apply_async将共享计数器传递给多处理中的任务,但是失败了,出现这样的错误:“RuntimeError:同步对象只应通过继承在进程之间共享”。怎么回事
def processLine(lines, counter, mutex):
pass
counter = multiprocessing.Value('i', 0)
mutex = multiprocessing.Lock()
pool = Pool(processes = 8)
lines = []
for line in inputStream:
lines.append(line)
if len(lines) >= 5000:
#don't queue more than 1'000'000 lines
while counter.value > 1000000:
time.sleep(0.05)
mutex.acquire()
counter.value += len(lines)
mutex.release()
pool.apply_async(processLine, args=(lines, counter, ), callback = collectResults)
lines = []
让池处理调度:
如果顺序无关紧要:
^{pr2}$pool.*map*()
函数具有chunksize
参数,您可以更改该参数以查看它是否影响您的案例中的性能。在如果代码期望在一个调用中传递多行:
限制排队项目数的一些替代方法是:
multiprocessing.Queue
设置最大大小(在这种情况下不需要池)。queue.put()
将在达到最大大小时阻塞,直到其他进程调用queue.get()
注意:每个值都有关联的锁,不需要单独的锁。在
我以一种不优雅的方式解决了这个问题
相关问题 更多 >
编程相关推荐