Python多处理管理器dict,从进程和主进程访问

2024-09-21 01:07:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试从主进程和进程访问多处理管理器dict。 我重写了一个值,并添加了一个值

最后,对于键2的值,dict应为61。 对于键1,它应该包含122个字符,因为我在dict的每个acess中添加了两个字符

我得到的是键1的字符数和键2的值总是不同的。 有时,他们是122和61预期。 尽管incre.*的print语句的总和总是应该是61

我试图将md1 dict传递给进程和函数,结果是一样的。 当我将trd.daemon更改为False时,结果也是一样的

我错过了什么

代码:

import multiprocessing as mp

def local_func():

    md1[1] += 'DR'
    md1['2'] += 1   
    print('incre func')

def test_func_proc(idd):

    md1[1] += idd
    md1['2'] += 1
    print('incre proc')

if __name__ == '__main__':

    mn = mp.Manager()

    md1 = mn.dict()
    md1[1] = ''
    md1['2'] = 1

    pc = 0
    procs = []

    for x in range(20):

        pc = pc + 1

        idd = str(pc).zfill(2)
        
        trd = mp.Process(target = test_func_proc, args=[idd, ])
        trd.daemon = True
        trd.start()

        procs.append(trd)

        md1[1] += 'TU'
        md1['2'] += 1
        print('incre loop')
        local_func()


    md1[1] += 'CX'
    md1['2'] += 1
    print('incre main')
    

    while True:

        end = True

        for proc in procs:

            if proc.is_alive():
                end = False
        
        if end is True:
            break


    print('md1 %s' % md1)

Tags: trueif进程mpprocdictendfunc
1条回答
网友
1楼 · 发布于 2024-09-21 01:07:46

你错过了什么?像md1[1] += 'TU'md1['2'] += 1这样的操作不是原子的。特别是,要执行md1[1] += 'TU',即附加到一个在Python中不可变的字符串,解释器必须首先获取旧字符串,然后创建一个新的字符串,它是旧字符串和'TU'的串联,最后存储结果。这可以在多个进程中并行进行,最后一个存储结果的进程可以覆盖在不同进程中创建的结果上。因此,您需要确保一旦一个进程启动了fetch-concatenate-store系列操作,就不会有其他进程同时对同一字符串执行相同的操作(或整数增量操作)。确保序列化的唯一方法是使用Lock

考虑到函数test_func_proc是100%CPU(调试打印语句除外),我建议不要盲目地在一台CPU核数较少的计算机上创建20个进程,而应该创建一个处理池,其大小受实际拥有的CPU核数的限制。但是Lock不能作为参数传递给辅助函数。相反,池中的每个进程将Lock实例视为一个全局变量,该变量已使用特殊的池初始值设定项函数初始化

import multiprocessing as mp

def local_func(lock):
    with lock:
        md1[1] += 'DR'
        md1['2'] += 1
    print('incre func')


def init_pool(l):
    global lock
    lock = l

def test_func_proc(md1, idd):
    with lock:
        md1[1] += idd
        md1['2'] += 1
    print('incre proc')

if __name__ == '__main__':
    mn = mp.Manager()
    md1 = mn.dict()

    md1[1] = ''
    md1['2'] = 1

    # create the Lock instance
    lock = mp.Lock()

    pc = 0
    # limit size of the pool to the smaller of 20 or the number of CPU cores we have:
    n_processes = min(20, mp.cpu_count())
    # initialize each process in the pool with our lock
    pool = mp.Pool(processes=n_processes, initializer=init_pool, initargs=(lock,))
    for x in range(20):
        pc = pc + 1
        idd = str(pc).zfill(2)
        # We need to explicitly pass md1 as an argument
        # or we could have instead initialized each process in the pool like we did with lock:
        pool.apply_async(test_func_proc, args=(md1, idd))
        # only modify the dictionary entries after acquiring the lock:
        with lock:
            md1[1] += 'TU'
            md1['2'] += 1
        # now the lock has been released
        print('incre loop')
        local_func(lock)

    with lock:
        md1[1] += 'CX'
        md1['2'] += 1
    print('incre main')

    # wait for pool tasks to end:
    pool.close()
    pool.join()

    print('md1 %s' % md1)
    print(len(md1[1]))

印刷品:

incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre loop
incre func
incre main
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
incre proc
md1 {1: 'TUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRTUDRCX0103040607021012051314151811080919201617', '2': 62}
122

更新

阅读关于Augmented Assignment Statements的文档,其中部分说明(但您应该阅读整个部分):

With the exception of assigning to tuples and multiple targets in a single statement, the assignment done by augmented assignment statements is handled the same way as normal assignments.

以及一个分解Python代码的演示,以揭示+=运算符的非原子性(执行b += 1需要4条指令):

>>> import dis
>>>
>>> a = 1
>>> b = 1
>>>
>>> def foo():
...     a = a + 1
...     b += 1
...
>>> dis.dis(foo)
  2           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 BINARY_ADD
              6 STORE_FAST               0 (a)

  3           8 LOAD_FAST                1 (b)
             10 LOAD_CONST               1 (1)
             12 INPLACE_ADD
             14 STORE_FAST               1 (b)
             16 LOAD_CONST               0 (None)
             18 RETURN_VALUE

问题在于,两个进程可以同时执行8到14的指令,加载相同的b值并将b增加到相同的新值。您需要确保一次只能有一个进程执行从8到14的指令。

相关问题 更多 >

    热门问题