同步工人池Python和多进程处理

2024-09-30 00:29:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我想做一个同步图的着色模拟。为了创建图(树),我使用了igraph包,并同步了我第一次使用的multiprocessing包。我构建了一个图,其中每个节点都有属性:labelcolor和{}。为了给树上色,我执行了以下函数(我没有给出完整的代码,因为它很长,我认为没有必要解决我的问题):

def sixColor(self):
        root = self.graph.vs.find("root")
        root["color"] = self.takeColorFromList(root["label"])
        self.sendToChildren(root)
        lista = []
        for e in self.graph.vs():
            lista.append(e.index)
        p = multiprocessing.Pool(len(lista))
        p.map(fun, zip([self]*len(lista), lista),chunksize=300) 


    def process_sixColor(self, id):
        v = self.graph.vs.find(id)
        if not v["name"] == "root":
            while True:
                if v["received"] == True:
                    v["received"] = False
                    #------------Part 1-----------
                    self.sendToChildren(v)
                    self.printInfo()
                    #-----------Part 2-------------
                    diffIdx = self.compareLabelWithParent(v)
                    if not diffIdx == -1:
                        diffIdxStr = str(bin(diffIdx))[2:]
                        charAtPos = (v["label"][::-1])[diffIdx]
                        newLabel = diffIdxStr + charAtPos
                        v["label"] = newLabel
                        self.sendToChildren(v)
                        colorNum = int(newLabel,2)
                        if colorNum in sixColorList:
                            v["color"] = self.takeColorFromList(newLabel)
                            self.printGraph()
                            break           

我希望每个节点(根节点除外)并行地同步调用函数process_sixColor,并且在所有节点生成Part 1之前,不会计算Part 2。但是我注意到这不能正常工作,一些节点在其他每个节点执行Part 1之前正在计算。我怎样才能解决这个问题?在


Tags: selfif节点rootmultiprocessinglabelgraphcolor
1条回答
网友
1楼 · 发布于 2024-09-30 00:29:09

您可以使用multiprocessing.Queuemultiprocessing.Event对象的组合来同步工作线程。使主进程创建一个Queue和一个Event,并将这两个都传递给所有工人。工人将使用Queue让主进程知道他们已经完成了第1部分。主进程将使用Event让所有工人知道所有工人都完成了第1部分。基本上

  • 工人将调用queue.put()让主进程知道他们已经到达第2部分,然后调用event.wait()等待主进程发出绿灯。

  • 主进程将反复调用queue.get(),直到它接收到与工作池中的worker相同多的消息,然后调用event.set(),为工人从第2部分开始工作开绿灯。

这是一个简单的例子:

from __future__ import print_function
from multiprocessing import Event, Process, Queue

def worker(identifier, queue, event):
    # Part 1
    print("Worker {0} reached part 1".format(identifier))

    # Let the main process know that we have finished part 1
    queue.put(identifier)

    # Wait for all the other processes
    event.wait()

    # Start part 2
    print("Worker {0} reached part 2".format(identifier))

def main():
    queue = Queue()
    event = Event()
    processes = []
    num_processes = 5

    # Create the worker processes
    for identifier in range(num_processes):
        process = Process(target=worker, args=(identifier, queue, event))
        processes.append(process)
        process.start()

    # Wait for "part 1 completed" messages from the processes
    while num_processes > 0:
        queue.get()
        num_processes -= 1

    # Set the event now that all the processes have reached part 2
    event.set()

    # Wait for the processes to terminate
    for process in processes:
        process.join()

if __name__ == "__main__":
    main()

如果您想在生产环境中使用它,您应该考虑如何处理第1部分中发生的错误。现在,如果第1部分中发生异常,工作进程将永远不会调用queue.put(),主进程将无限期地阻塞,等待来自失败的工作进程的消息。生产就绪的解决方案可能应该将整个第1部分包装在try..except块中,然后在队列中发送一个特殊的错误信号。如果队列中接收到错误信号,则主进程可以立即退出。在

相关问题 更多 >

    热门问题