我发誓我在某个地方看到了下面的例子,但是现在我找不到这个例子,而且这个例子不起作用。永远不会调用该函数。
编辑:代码已更新
在池.map显示为启动QueueWriter实例,并且到达了\u优call_Uu函数。然而,工人似乎从来没有开始,或者至少没有从队列中提取结果。我的排队方式正确吗?为什么工人们不开火?在
import multiprocessing as mp
import os
import random
class QueueWriter(object):
def __init__(self, **kwargs):
self.grid = kwargs.get("grid")
self.path = kwargs.get("path")
def __call__(self, q):
print self.path
log = open(self.path, "a", 1)
log.write("QueueWriter called.\n")
while 1:
res = q.get()
if res == 'kill':
self.log.write("QueueWriter received 'kill' message. Closing Writer.\n")
break
else:
self.log.write("This is where I'd write: {0} to grid file.\n".format(res))
log.close()
log = None
class Worker(object):
def __init__(self, **kwargs):
self.queue = kwargs.get("queue")
self.grid = kwargs.get("grid")
def __call__(self, idx):
res = self.workhorse(self, idx)
self.queue.put((idx,res))
return res
def workhorse(self,idx):
#in reality a fairly complex operation
return self.grid[idx] ** self.grid[idx]
if __name__ == '__main__':
# log = open(os.path.expanduser('~/minimal.log'), 'w',1)
path = os.path.expanduser('~/minimal.log')
pool = mp.Pool(mp.cpu_count())
manager = mp.Manager()
q = manager.Queue()
grid = [random.random() for _ in xrange(10000)]
# in actuality grid is a shared resource, read by Workers and written
# to by QueueWriter
qWriter = QueueWriter(grid=grid, path=path)
watcher = pool.map(qWriter, (q,),1)
wrkr = Worker(queue=q,grid=grid)
result = pool.map(wrkr, range(10000), 1)
result.get()
q.put('kill')
pool.close()
pool.join()
因此,日志确实打印了初始化消息,但之后永远不会调用函数。这是我经常讨论的酸洗问题吗?我已经找到了关于类成员函数的答案,但是类实例呢?在
在温柔耐心的催促下(谢谢!)我想我已经解决了问题。我还没有把它应用到我的原始代码中,但是它在上面的例子中起作用了,我将针对将来的实现问题提出新的问题。在
因此,除了在代码中更改目标文件(本例中为日志)的打开位置之外,我还将QueueWriter实例作为单个多处理进程启动,而不是使用
pool.map
。正如martineau指出的,映射调用块直到qWriter.__call__()
返回,这阻止了工人被调用。在上面的代码中还有一些其他错误,但这些错误是偶然发生的,并在下面进行了修复:
相关问题 更多 >
编程相关推荐