我想做一个同步图的着色模拟。为了创建图(树),我使用了igraph
包,并同步了我第一次使用的multiprocessing
包。我构建了一个图,其中每个节点都有属性:label
、color
和{
def sixColor(self):
root = self.graph.vs.find("root")
root["color"] = self.takeColorFromList(root["label"])
self.sendToChildren(root)
lista = []
for e in self.graph.vs():
lista.append(e.index)
p = multiprocessing.Pool(len(lista))
p.map(fun, zip([self]*len(lista), lista),chunksize=300)
def process_sixColor(self, id):
v = self.graph.vs.find(id)
if not v["name"] == "root":
while True:
if v["received"] == True:
v["received"] = False
#------------Part 1-----------
self.sendToChildren(v)
self.printInfo()
#-----------Part 2-------------
diffIdx = self.compareLabelWithParent(v)
if not diffIdx == -1:
diffIdxStr = str(bin(diffIdx))[2:]
charAtPos = (v["label"][::-1])[diffIdx]
newLabel = diffIdxStr + charAtPos
v["label"] = newLabel
self.sendToChildren(v)
colorNum = int(newLabel,2)
if colorNum in sixColorList:
v["color"] = self.takeColorFromList(newLabel)
self.printGraph()
break
我希望每个节点(根节点除外)并行地同步调用函数process_sixColor
,并且在所有节点生成Part 1
之前,不会计算Part 2
。但是我注意到这不能正常工作,一些节点在其他每个节点执行Part 1
之前正在计算。我怎样才能解决这个问题?在
您可以使用
multiprocessing.Queue
和multiprocessing.Event
对象的组合来同步工作线程。使主进程创建一个Queue
和一个Event
,并将这两个都传递给所有工人。工人将使用Queue
让主进程知道他们已经完成了第1部分。主进程将使用Event
让所有工人知道所有工人都完成了第1部分。基本上工人将调用
queue.put()
让主进程知道他们已经到达第2部分,然后调用event.wait()
等待主进程发出绿灯。主进程将反复调用
queue.get()
,直到它接收到与工作池中的worker相同多的消息,然后调用event.set()
,为工人从第2部分开始工作开绿灯。这是一个简单的例子:
如果您想在生产环境中使用它,您应该考虑如何处理第1部分中发生的错误。现在,如果第1部分中发生异常,工作进程将永远不会调用
queue.put()
,主进程将无限期地阻塞,等待来自失败的工作进程的消息。生产就绪的解决方案可能应该将整个第1部分包装在try..except
块中,然后在队列中发送一个特殊的错误信号。如果队列中接收到错误信号,则主进程可以立即退出。在相关问题 更多 >
编程相关推荐