多处理:将类实例传递给池.map

2024-10-01 09:17:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我发誓我在某个地方看到了下面的例子,但是现在我找不到这个例子,而且这个例子不起作用。永远不会调用该函数。

编辑:代码已更新

在池.map显示为启动QueueWriter实例,并且到达了\u优call_Uu函数。然而,工人似乎从来没有开始,或者至少没有从队列中提取结果。我的排队方式正确吗?为什么工人们不开火?在

import multiprocessing as mp
import os
import random

class QueueWriter(object):
    def __init__(self, **kwargs): 
        self.grid = kwargs.get("grid")
        self.path = kwargs.get("path")

    def __call__(self, q):
        print self.path
        log = open(self.path, "a", 1)
        log.write("QueueWriter called.\n")    
        while 1:
            res = q.get()
            if res == 'kill':
                self.log.write("QueueWriter received 'kill' message. Closing Writer.\n")
                break
            else:
                self.log.write("This is where I'd write: {0} to grid file.\n".format(res))

        log.close()
        log = None

class Worker(object):
    def __init__(self, **kwargs):
        self.queue = kwargs.get("queue")
        self.grid = kwargs.get("grid")

    def __call__(self, idx):
        res = self.workhorse(self, idx)
        self.queue.put((idx,res))
        return res

    def workhorse(self,idx):
        #in reality a fairly complex operation
        return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
#     log = open(os.path.expanduser('~/minimal.log'), 'w',1)
    path = os.path.expanduser('~/minimal.log')

    pool = mp.Pool(mp.cpu_count())
    manager = mp.Manager()
    q = manager.Queue()

    grid = [random.random() for _ in xrange(10000)] 
    # in actuality grid is a shared resource, read by Workers and written
    # to by QueueWriter

    qWriter = QueueWriter(grid=grid, path=path)
    watcher = pool.map(qWriter, (q,),1)
    wrkr = Worker(queue=q,grid=grid)
    result = pool.map(wrkr, range(10000), 1)
    result.get()
    q.put('kill')
    pool.close()
    pool.join()    

因此,日志确实打印了初始化消息,但之后永远不会调用函数。这是我经常讨论的酸洗问题吗?我已经找到了关于类成员函数的答案,但是类实例呢?在


Tags: pathselfloggetqueuedefresmp
1条回答
网友
1楼 · 发布于 2024-10-01 09:17:35

在温柔耐心的催促下(谢谢!)我想我已经解决了问题。我还没有把它应用到我的原始代码中,但是它在上面的例子中起作用了,我将针对将来的实现问题提出新的问题。在

因此,除了在代码中更改目标文件(本例中为日志)的打开位置之外,我还将QueueWriter实例作为单个多处理进程启动,而不是使用pool.map。正如martineau指出的,映射调用块直到qWriter.__call__()返回,这阻止了工人被调用。在

上面的代码中还有一些其他错误,但这些错误是偶然发生的,并在下面进行了修复:

import multiprocessing as mp
import os
import random

class QueueWriter(object):
    def __init__(self, **kwargs): 
        self.grid = kwargs.get("grid")
        self.path = kwargs.get("path")

    def __call__(self, q):
        print self.path
        log = open(self.path, "a", 1)
        log.write("QueueWriter called.\n")    
        while 1:
            res = q.get()
            if res == 'kill':
                log.write("QueueWriter received 'kill' message. Closing Writer.\n")
                break
            else:
                log.write("This is where I'd write: {0} to grid file.\n".format(res))

        log.close()
        log = None

class Worker(object):
    def __init__(self, **kwargs):
        self.queue = kwargs.get("queue")
        self.grid = kwargs.get("grid")

    def __call__(self, idx):
        res = self.workhorse(idx)
        self.queue.put((idx,res))
        return res

    def workhorse(self,idx):
        #in reality a fairly complex operation
        return self.grid[idx] ** self.grid[idx]


if __name__ == '__main__':
#     log = open(os.path.expanduser('~/minimal.log'), 'w',1)
    path = os.path.expanduser('~/minimal.log')

    pool = mp.Pool(mp.cpu_count())
    manager = mp.Manager()
    q = manager.Queue()

    grid = [random.random() for _ in xrange(10000)] 
    # in actuality grid is a shared resource, read by Workers and written
    # to by QueueWriter

    qWriter = QueueWriter(grid=grid, path=path)
#     watcher = pool.map(qWriter, (q,),1)
# Start the writer as a single process rather than a pool
    p = mp.Process(target=qWriter, args=(q,))
    p.start()
    wrkr = Worker(queue=q,grid=grid)
    result = pool.map(wrkr, range(10000), 1)
#     result.get()
# not required for pool
    q.put('kill')
    pool.close()
    p.join()
    pool.join()

相关问题 更多 >