Python在多处理进程之间共享一个deque

2024-05-06 21:36:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pas-hour上一直在看以下问题,但没有任何运气:

Python sharing a dictionary between parallel processes

multiprocessing: sharing a large read-only object between processes?

multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

我写了一个非常基本的测试文件来说明我要做的事情:

from collections import deque
from multiprocessing import Process
import numpy as np


class TestClass:
    def __init__(self):
        self.mem = deque(maxlen=4)
        self.process = Process(target=self.run)

    def run(self):
        while True:
            self.mem.append(np.array([0, 1, 2, 3, 4]))


def print_values(x):
    while True:
        print(x)


test = TestClass()
process = Process(target=print_values(test.mem))

test.process.start()
process.start()

目前,它输出以下内容:

^{pr2}$

如何从主代码或运行“打印值”的进程访问mem值?在


Tags: fromtestimportselfobjectdefbetweenmultiprocessing
2条回答

不幸的是,multiprocessing.Manager()不支持deque,但它可以与listdictQueueValue和{}。一个list非常接近,所以我在下面的示例中使用了它。。在

from multiprocessing import Process, Manager, Lock
import numpy as np

class TestClass:
    def __init__(self):
        self.maxlen = 4
        self.manager = Manager()
        self.mem = self.manager.list()
        self.lock = self.manager.Lock()
        self.process = Process(target=self.run, args=(self.mem, self.lock))

    def run(self, mem, lock):
        while True:
            array = np.random.randint(0, high=10, size=5)
            with lock:
                if len(mem) >= self.maxlen:
                    mem.pop(0)
                mem.append(array)

def print_values(mem, lock):
    while True:
        with lock:
            print mem

test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()

test.process.join()
print_process.join()

在使用管理器对象时要小心一点。你可以像它们引用的对象一样使用它们,但是你不能做像。。。mem = mem[-4:]以截断值,因为您正在更改被引用对象。在

至于编码风格,我可能会将Manager对象移到类之外,或者将print_values函数移到类内,但举个例子,这是可行的。如果要移动对象,只需注意不能在run方法中直接使用self.mem。您需要在启动进程时传递它,否则python在后台执行的fork将创建一个新实例,并且不会共享它。在

希望这对你的情况有用,如果不行,我们可以尝试调整一下。在

因此,通过结合@bivouac0提供的代码和@Marijn Pieters发布的评论,我想出了以下解决方案:

from multiprocessing import Process, Manager, Queue


class testClass:
    def __init__(self, maxlen=4):
        self.mem = Queue(maxsize=maxlen)
        self.process = Process(target=self.run)

    def run(self):
        i = 0

        while True:
            self.mem.empty()
            while not self.mem.full():
                self.mem.put(i)
                i += 1


def print_values(queue):
    while True:
        values = queue.get()
        print(values)


if __name__ == "__main__":
    test = testClass()
    print_process = Process(target=print_values, args=(test.mem,))

    test.process.start()
    print_process.start()

    test.process.join()
    print_process.join()

相关问题 更多 >