python多进程同步更新字典

2024-05-08 20:49:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过多个过程更新一个通用字典。你能帮我找出这个代码有什么问题吗?我得到以下输出:

inside function
{1: 1, 2: -1}
comes here
inside function
{1: 0, 2: 2}
comes here
{1: 0, 2: -1}

谢谢。

from multiprocessing import Lock, Process, Manager

l= Lock()


def computeCopyNum(test,val):
    l.acquire()
    test[val]=val
    print "inside function"
    print test
    l.release()
    return

a=dict({1: 0, 2: -1})

procs=list()

for i in range(1,3):
    p = Process(target=computeCopyNum, args=(a,i))
    procs.append(p)
    p.start()

for p in procs:
p.join()
    print "comes here"

print a

Tags: intestlockfor字典here过程function
3条回答

您导入Manager,但不使用它。作为第一种方法,请改为:

a = Manager().dict({1: 0, 2: -1})

使用multiprocessing时,全局变量的工作方式与预期不同。子流程只能访问副本,并且在它们退出时会忘记它们所做的更改,除非您使用的是能够在流程之间传播信息的特殊设计的对象。

在进程之间传递数据有许多不同的替代方法,但是像上面那样使用Manager对象通常是最简单的。您还可以使用Manager对象创建多个共享对象:

manager = Manager()
a = manager.dict({1: 0, 2: -1})
b = manager.list((1, 2, 3))

有关详细信息,请参阅^{}文档。

另外,你使用的锁是不必要的。Manager帮你处理。正如docs所说

Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program.

进程不像线程那样共享内存。每个进程最终都有自己的独立副本。如果要在不同的线程中工作,则需要使用管道或其他进程间通信将数据返回到中心进程。

答案其实很简单。您正在使用多处理模块,使用该模块可以启动几个不同的python进程。不同的进程有不同的地址空间,它们不共享内存,因此所有进程都会写入自己的本地字典副本。

使用多处理模块时,进行进程间通信的最简单方法是使用队列在从属进程和主进程之间进行通信。

from multiprocessing import Process, Queue

def computeCopyNum(queue, val):
    queue.put(val) # can also put a tuple of thread-id and value if we would like to

procs=list()

queue = Queue()
for i in range(1,3):
    p = Process(target=computeCopyNum, args=(queue, i))
    procs.append(p)
    p.start()

for _ in procs:
    val = queue.get()
    # do whatever with val

for p in procs:
    p.join()

如果每个从进程都可以生成多个输出值,那么让每个从进程向队列写入一个sentinel值以向主进程发出它已经完成的信号可能是明智的。然后代码可能看起来像:

def slave(queue):
    for i in range(128): # just for example
        val = #some calculated result
        queue.put(val)

    queue.put(None) # add a sentinel value to tell the master we're done

queue = Queue()

# spawn 32 slave processes
num_procs = 32
procs = [Process(target=slave, args=(queue, )) for _ in range(num_procs)]
for proc in procs: 
    proc.start()

finished = 0
while finished < num_procs:
    item = queue.get()
    if item is None: 
        finished += 1
    else: 
        # do something with item

for proc in procs: 
    proc.join()

您也可以使用管理器,如另一个答案所示。这种方法的问题是,进程地址空间之间可能会发生很多隐式内存复制,这很难解释。我总是喜欢使用显式队列。

相关问题 更多 >