Python多处理。传递ctype对象时队列暂停

2024-09-29 00:14:39 发布

您现在位置:Python中文网/ 问答频道 /正文

以下程序执行以下操作:

  1. 父进程创建数据类型为SHARED_DTYPE的进程间共享值
  2. 父进程创建进程间队列,以将对象从子进程传递到父进程
  3. 父进程生成子进程(并等待对象通过进程间队列到达)
  4. 子进程修改进程间共享值的值
  5. 子进程创建数据类型为TRAVELLER_DTYPE的对象
  6. 子进程通过进程间队列传递创建的对象
  7. 父进程通过进程间队列接收对象
from multiprocessing import Value, Process, Queue
import ctypes

SHARED_DTYPE = ctypes.c_int
TRAVELLER_DTYPE = ctypes.c_float

shared_value = Value(SHARED_DTYPE, 0)
print('type of shared_value =', type(shared_value))
print('shared_value =', shared_value.value)

def child_proc():
    try:
        shared_value.value = 1
        obj = TRAVELLER_DTYPE(5)
        print('send into queue =', obj)
        q.put(obj)
    except BaseException as e:
        print(e)
    finally:
        print('child_proc process is finished')

if __name__ == "__main__":
    try:
        q = Queue()
        cp = Process(target=child_proc)
        cp.start()
        cp.join()

        print('shared_value =', shared_value.value)
        obj = q.get()
        print('recv from queue =', obj)
    except BaseException as e:
        print(e)
    finally:
        print('__main__ process is finished')

现在,如果运行上述程序,它将正常工作,并给出以下输出:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_float(5.0)
child_proc process is finished
shared_value = 1
recv from queue = c_float(5.0)
__main__ process is finished

但是,如果我们将程序顶部的TRAVELLER_DTYPE更改为ctypes.c_int,它将不再正常工作

有时,它会给出以下输出:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_int(5)
child_proc process is finished
shared_value = 1
^C                               <-- Pressed ctrl-C here, was hung indefinitely.
__main__ process is finished

而在其他情况下,它会提供以下输出:

type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = c_int(5)
child_proc process is finished
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.8/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
    assert_spawning(obj)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: c_int objects should only be shared between processes through inheritance
shared_value = 1
^C                            <-- Pressed ctrl-C here, was hung indefinitely.
__main__ process is finished

为什么?

通常,当且仅当SHARED_DTYPE != TRAVELLER_DTYPE

是否需要一些显式锁定对象


Python多处理doc page没有提到任何此类问题

联机搜索错误消息不会提供任何相关信息/线索:


Tags: 对象childobjqueue进程isvalueproc
1条回答
网友
1楼 · 发布于 2024-09-29 00:14:39

奇怪的是,当两种类型不相同时,它可以工作,但当它们相同时,它就失败了。上面提到的bug报告看起来相关,但很旧。这似乎是一个bug。一种解决方法是,与值对象不同,队列对象不需要是ctypes类型(也许不应该是ctypes),因此您可以使用intfloat来代替,并且它可以工作

我假设您在Linux上运行,但在Windows上,它使用进程的派生与分叉,并且通过派生将脚本导入子进程,使全局变量在进程之间的实例不同。这甚至会使您的“工作”场景在Windows上失败。相反,应该将队列和共享值作为参数传递给子工作者,确保它们作为相同的对象正确继承(这可能就是错误消息所指的)

在下面,我还重新安排了用于生成的代码,以便它可以在Windows和Linux上工作:

from multiprocessing import Value, Process, Queue
import ctypes

SHARED_DTYPE = ctypes.c_int
TRAVELLER_DTYPE = int

def child_proc(q,shared_value):
    shared_value.value = 1
    obj = TRAVELLER_DTYPE(5)
    print('send into queue =', obj)
    q.put(obj)
    print('child_proc process is finished')

if __name__ == "__main__":
    shared_value = Value(SHARED_DTYPE, 0)
    print('type of shared_value =', type(shared_value))
    print('shared_value =', shared_value.value)
    q = Queue()
    cp = Process(target=child_proc,args=(q,shared_value))
    cp.start()
    cp.join()

    print('shared_value =', shared_value.value)
    obj = q.get()
    print('recv from queue =', obj)
    print('__main__ process is finished')
type of shared_value = <class 'multiprocessing.sharedctypes.Synchronized'>
shared_value = 0
send into queue = 5
child_proc process is finished
shared_value = 1
recv from queue = 5
__main__ process is finished

相关问题 更多 >