多处理中的共享内存

2024-09-28 19:27:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我有三个大单子。第一个包含位数组(模块位数组0.8.0),另外两个包含整数数组。

l1=[bitarray 1, bitarray 2, ... ,bitarray n]
l2=[array 1, array 2, ... , array n]
l3=[array 1, array 2, ... , array n]

这些数据结构需要相当多的RAM(总共约16GB)。

如果我使用以下命令启动12个子进程:

multiprocessing.Process(target=someFunction, args=(l1,l2,l3))

这是否意味着将为每个子流程复制l1、l2和l3,或者子流程共享这些列表?或者更直接地说,我会使用16GB还是192GB的RAM?

某些函数将从这些列表中读取一些值,然后根据读取的值执行一些计算。结果将返回到父进程。列表l1、l2和l3不会被someFunction修改。

因此,我假设子进程不需要,也不会复制这些巨大的列表,而是与父进程共享它们。这意味着,由于linux下的“写时拷贝”方法,程序将占用16GB的RAM(无论启动多少个子进程)? 我是对的还是我遗漏了一些会导致列表被复制的东西?

编辑: 我读了更多关于这个问题的文章后,仍然感到困惑。一方面,Linux使用copy-On-write,这意味着没有数据被复制。另一方面,访问对象将更改其ref计数(我仍然不确定这意味着什么)。即便如此,整个对象是否会被复制?

例如,如果我定义某个函数如下:

def someFunction(list1, list2, list3):
    i=random.randint(0,99999)
    print list1[i], list2[i], list3[i]

使用此函数是否意味着将为每个子进程完全复制l1、l2和l3?

有办法检查一下吗?

EDIT2在读取更多数据并在子进程运行时监视系统的总内存使用情况之后,似乎确实为每个子进程复制了整个对象。这似乎是因为引用计数。

在我的程序中,l1、l2和l3的引用计数实际上是不需要的。这是因为在父进程退出之前,l1、l2和l3将保留在内存中(保持不变)。在此之前,不需要释放这些列表使用的内存。事实上,我确信在程序退出之前,引用计数将保持在0以上(对于这些列表和这些列表中的每个对象)。

所以现在问题变成了,我如何确保对象不会被复制到每个子进程?我可以为这些列表和列表中的每个对象禁用引用计数吗?

EDIT3只是一个附加说明。子进程不需要修改l1l2l3或这些列表中的任何对象。子进程只需要能够引用其中一些对象,而不需要为每个子进程复制内存。


Tags: 对象函数内存程序l1列表进程数组
3条回答

如果您想使用写时拷贝特性,并且您的数据是静态的(在子进程中保持不变),那么您应该让python不处理数据所在的内存块。您可以很容易地通过使用C或C++结构(例如STL)作为容器,并提供自己的Python包装器,当Python级对象将被创建时,它将使用指针指向数据存储器(或者可能复制数据MEM)。 所有这些都可以通过python的简单性和使用cython的语法轻松完成。

# pseudo cython
cdef class FooContainer:
   cdef char * data
   def __cinit__(self, char * foo_value):
       self.data = malloc(1024, sizeof(char))
       memcpy(self.data, foo_value, min(1024, len(foo_value)))

   def get(self):
       return self.data

# python part
from foo import FooContainer

f = FooContainer("hello world")
pid = fork()
if not pid:
   f.get() # this call will read same memory page to where
           # parent process wrote 1024 chars of self.data
           # and cython will automatically create a new python string
           # object from it and return to caller

上面的伪代码写得不好。别用它。代替Self。在你的情况下数据应该是C或C++容器。

一般来说,共享同一数据有两种方法:

  • 多线程
  • 共享内存

Python的多线程不适合于CPU绑定的任务(因为GIL),所以在这种情况下通常的解决方案是继续multiprocessing。但是,对于这个解决方案,您需要使用^{}^{}显式地共享数据。

注意,由于所有的同步问题,通常在进程之间共享数据可能不是最佳选择;一种涉及参与者交换消息的方法通常被视为更好的选择。另请参见Python documentation

As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

在您的例子中,您需要以某种方式包装l1l2l3,这可以通过multiprocessing(例如使用multiprocessing.Array)理解,然后将它们作为参数传递。
还要注意,正如您所说,您不需要写访问,那么您应该在创建对象时传递lock=False,否则所有访问仍将被序列化。

因为这在google上仍然是一个很高的结果,而且还没有其他人提到它,我想我会提到python 3.8.0版本中引入的“true”共享内存的新可能性:https://docs.python.org/3/library/multiprocessing.shared_memory.html

我在这里提供了一个小的人工示例(在linux上测试),其中使用了numpy数组,这可能是一个非常常见的用例:

# one dimension of the 2d array which is shared
dim = 5000

import numpy as np
from multiprocessing import shared_memory, Process, Lock
from multiprocessing import cpu_count, current_process
import time

lock = Lock()

def add_one(shr_name):

    existing_shm = shared_memory.SharedMemory(name=shr_name)
    np_array = np.ndarray((dim, dim,), dtype=np.int64, buffer=existing_shm.buf)
    lock.acquire()
    np_array[:] = np_array[0] + 1
    lock.release()
    time.sleep(10) # pause, to see the memory usage in top
    print('added one')
    existing_shm.close()

def create_shared_block():

    a = np.ones(shape=(dim, dim), dtype=np.int64)  # Start with an existing NumPy array

    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    # # Now create a NumPy array backed by shared memory
    np_array = np.ndarray(a.shape, dtype=np.int64, buffer=shm.buf)
    np_array[:] = a[:]  # Copy the original data into shared memory
    return shm, np_array

if current_process().name == "MainProcess":
    print("creating shared block")
    shr, np_array = create_shared_block()

    processes = []
    for i in range(cpu_count()):
        _process = Process(target=add_one, args=(shr.name,))
        processes.append(_process)
        _process.start()

    for _process in processes:
        _process.join()

    print("Final array")
    print(np_array[:10])
    print(np_array[10:])

    shr.close()
    shr.unlink()

请注意,由于64位整数,此代码可能需要大约1gb的ram才能运行,因此请确保不会冻结使用它的系统。^_^

相关问题 更多 >