Python多处理apply_async+Valu

2024-06-23 18:39:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过apply_async将共享计数器传递给多处理中的任务,但是失败了,出现这样的错误:“RuntimeError:同步对象只应通过继承在进程之间共享”。怎么回事

def processLine(lines, counter, mutex):
    pass

counter = multiprocessing.Value('i', 0)
mutex = multiprocessing.Lock()
pool = Pool(processes = 8)
lines = []

for line in inputStream:
    lines.append(line)
    if len(lines) >= 5000:
         #don't queue more than 1'000'000 lines
         while counter.value > 1000000:
                 time.sleep(0.05)
         mutex.acquire()
         counter.value += len(lines)
         mutex.release()
         pool.apply_async(processLine, args=(lines, counter, ), callback = collectResults)
         lines = []

Tags: 对象asynclenvalue错误linecounter计数器
2条回答

让池处理调度:

for result in pool.imap(process_single_line, input_stream):
    pass

如果顺序无关紧要:

^{pr2}$

pool.*map*()函数具有chunksize参数,您可以更改该参数以查看它是否影响您的案例中的性能。在

如果代码期望在一个调用中传递多行:

from itertools import izip_longest

chunks = izip_longest(*[iter(inputStream)]*5000, fillvalue='') # grouper recipe
for result in pool.imap(process_lines, chunks):
    pass

限制排队项目数的一些替代方法是:

  • multiprocessing.Queue设置最大大小(在这种情况下不需要池)。queue.put()将在达到最大大小时阻塞,直到其他进程调用queue.get()
  • 使用多处理原语(如条件或绑定信号量)手动实现生产者/消费者模式。在

注意:每个值都有关联的锁,不需要单独的锁。在

我以一种不优雅的方式解决了这个问题

def processLine(lines):
    pass

def collectResults(result):
    global counter
    counter -= len(result)

counter = 0
pool = Pool(processes = 8)
lines = []

for line in inputStream:
    lines.append(line)
    if len(lines) >= 5000:
         #don't queue more than 1'000'000 lines
         while counter.value > 1000000:
             time.sleep(0.05)
         counter.value += len(lines)
         pool.apply_async(processLine, args=(lines), callback = collectResults)
         lines = []

相关问题 更多 >

    热门问题