在Python中使用multiprocessing.Process()时未同步的值

2024-09-30 22:27:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将Dijkstra算法并行化。这个想法是:

  • 将所有节点分成多个集合(集合数=处理器数:vertexSets[i]
  • minDistance()在每个集合中查找最小顶点(具有最小距离)
  • 将此最小顶点发送到所有进程(将此顶点添加到每个进程的集合)

我使用带有变量minVertexmultiprocessing.Value在每次进程发现时保存新的最小顶点,然后将此最小顶点更新到所有进程。我正在使用的3个进程运行相同的函数来推送并获取minVertex的值。我不熟悉Python中的multiprocessing,所以我在这里使用的可能不是正确的或最好的

最小顶点列表应该包括:[0,1,2]来自进程1和[7,6]来自进程2。这些值需要添加到所有进程1、2、3的vertexSet[]中。 但是,在将minVertex值更新到所有进程之后,有时minVertex会在添加到某些进程的vertexSet之前更改为新值。 初始集为:[0,1,2]进程1[3,4,5]用于过程2[6,7,8]用于进程3。 我得到的部分结果如下

def parallelDijkstra(self, src, vertexSet, minVertex, lock):
    # print("vertexSet size", vertexSet.size)
    dist = [sys.maxsize] * self.V # assign all nodes of the whole set = infinity
    dist[src] = 0
    sptSet = [False] * self.V # all nodes are False: no node has been visited yet
    before = [-1] * self.V
    u = 0
    for i in range(vertexSet.size):
        u = self.minDistance(dist, sptSet, vertexSet) # extract the min vertext
        sptSet[u] = True

        lock.acquire()
        minVertex.value = u

        print("process", multiprocessing.current_process().name)
        print("vertexSet before", vertexSet)
        print("*minVertex", minVertex.value)
        vertexSet = np.append(minVertex.value, vertexSet)
        print("vertexSet after", vertexSet)
        print(" ")
        lock.release()
def separateDataProcessors(self, src, numProcessors):
    vertices = []
    for i in range(self.V):
        vertices.append(i)
    vertexSets = np.array_split(vertices, numProcessors)
    # [array([0, 1, 2]), array([3, 4, 5]), array([6, 7, 8])]

    # the min-distance storing vertex to update to all processes
    minVertex = multiprocessing.Value("i")

    lock = multiprocessing.Lock()


    # creating processes
    # for i in range(numProcessors):
    p1 = multiprocessing.Process(target=self.parallelDijkstra, name="p1", args=(src, vertexSets[0], minVertex, lock))
    p2 = multiprocessing.Process(target=self.parallelDijkstra, name="p2", args=(src, vertexSets[1], minVertex, lock))
    p3 = multiprocessing.Process(target=self.parallelDijkstra, name="p3", args=(src, vertexSets[2], minVertex, lock))

    # starting processes
    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
process p1
process p2
vertexSet before [0 3 4 5]
vertexSet before [0 1 2]
vertexSet after [0 6 7 8]
*minVertex 0

vertexSet after [0 1 2]
*minVertex 0
process p3

vertexSet before [0 6 7 8]
vertexSet after [7 0 3 4 5]
*minVertex 7
vertexSet after [0 6 7 8]

process p3
vertexSet before [0 6 7 8]

*minVertex 6
process p2
process p1
vertexSet before [0 1 2]
*minVertex 2
vertexSet after [0 6 7 8]
vertexSet after [0 1 2]

vertexSet before [7 0 3 4 5]

*minVertex 2
vertexSet after [2 7 0 3 4 5]

图表:

g.graph = [[0, 4, 0, 0, 0, 0, 0, 8, 0],
           [4, 0, 8, 0, 0, 0, 0, 11, 0],
           [0, 8, 0, 7, 0, 4, 0, 0, 2],
           [0, 0, 7, 0, 9, 14, 0, 0, 0],
           [0, 0, 0, 9, 0, 10, 0, 0, 0],
           [0, 0, 4, 14, 10, 0, 2, 0, 0],
           [0, 0, 0, 0, 0, 2, 0, 1, 6],
           [8, 11, 0, 0, 0, 0, 1, 0, 7],
           [0, 0, 2, 0, 0, 0, 6, 7, 0]]

Tags: selfsrclock进程multiprocessingprocessprintp2