理解多处理:Python中的共享内存管理、锁和队列

2024-05-08 23:40:01 发布

您现在位置:Python中文网/ 问答频道 /正文

Multiprocessing是python中一个强大的工具,我想更深入地理解它。 我想知道何时使用regularLocksQueues以及何时使用多处理Manager在所有进程之间共享这些。

我提出了以下测试场景,其中有四种不同的多处理条件:

  1. 使用游泳池和经理

  2. 使用游泳池和管理器

  3. 使用个别流程和经理

  4. 使用单个流程和管理器

工作

所有条件都执行作业函数the_jobthe_job包含一些由锁保护的打印。此外,函数的输入只是放入一个队列中(看看它是否可以从队列中恢复)。这个输入只是在名为start_scenario的主脚本中创建的idx的索引(显示在底部)。

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

条件的成功被定义为完全调用输入 从队列中,查看底部的函数read_queue

条件

条件1和条件2相当不言自明。 条件1包括创建锁和队列,并将它们传递给流程池:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(helper函数make_iterator在本文的底部给出。) 条件1失败,出现RuntimeError: Lock objects should only be shared between processes through inheritance

条件2非常相似,但现在锁和队列由管理器管理:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

在条件3中,手动启动新进程,并且在没有管理器的情况下创建锁和队列:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

条件4类似,但现在再次使用管理器:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

在两种情况下-3和4-我开始一个新的 处理the_job的10个任务中的每一个,最多有ncores个进程 同时操作。这是通过以下助手函数实现的:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

结果

只有条件1失败(RuntimeError: Lock objects should only be shared between processes through inheritance),而其他3个条件成功。我试着用我的头脑来思考这个结果。

为什么池需要在所有进程之间共享锁和队列,但条件3中的各个进程不需要?

我知道的是,对于池条件(1和2),来自迭代器的所有数据都是通过pickling传递的,而在单进程条件(3和4)中,来自迭代器的所有数据都是通过主进程的继承传递的(我使用的是Linux)。 我想,在从子进程中更改内存之前,将访问父进程使用的相同内存(写时复制)。但是只要有人说lock.acquire(),就应该更改它,并且子进程确实在内存中的其他地方使用不同的锁,不是吗?一个子进程如何知道一个兄弟激活了一个不通过管理器共享的锁?

最后,有点相关的是我的问题,条件3和条件4有多大的不同。两者都有单独的过程,但它们在管理器的使用上有所不同。两者都被认为是有效的代码吗?或者,如果实际上不需要经理,那么应该避免使用经理吗?


完整脚本

对于那些只想复制和粘贴所有内容来执行代码的人,下面是完整的脚本:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

Tags: thelockforqueuemanagerargsjobmp
1条回答
网友
1楼 · 发布于 2024-05-08 23:40:01

multiprocessing.Lock是使用操作系统提供的信号量对象实现的。在Linux上,子对象只是通过os.fork从父对象继承信号量的句柄。这不是信号量的副本;它实际上继承了父进程拥有的相同句柄,继承文件描述符的方式也一样。另一方面,Windows不支持os.fork,因此它必须对Lock进行pickle。它通过使用Windows ^{}API为multiprocessing.Lock对象在内部使用的Windows信号量创建一个重复的句柄来实现,该API声明:

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles

通过DuplicateHandleAPI,您可以将复制句柄的所有权授予子进程,以便子进程在取消对其的绑定后可以实际使用它。通过创建子对象拥有的重复句柄,可以有效地“共享”锁对象。

这里是multiprocessing/synchronize.py中的信号量对象

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

注意assert_spawning调用中的__getstate__,它在拾取对象时被调用。这是如何实现的:

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

这个函数通过调用thread_is_spawning,确保您“继承”了Lock。在Linux上,该方法只返回False

@staticmethod
def thread_is_spawning():
    return False

这是因为Linux不需要pickle来继承Lock,所以如果__getstate__实际上是在Linux上调用的,我们就不能继承。在Windows上,还有更多:

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

这里,thread_is_spawning如果Popen._tls对象具有process_handle属性,则返回True。我们可以看到在__init__中创建了process_handle属性,然后使用dump将要继承的数据从父级传递到子级,然后删除该属性。因此thread_is_spawning将只在__init__期间是True。根据this python-ideas mailing list thread,这实际上是一个人为的限制,用于模拟Linux上的os.fork行为。Windows实际上可以在任何时候支持传递Lock,因为DuplicateHandle可以在任何时候运行。

以上所有内容都适用于Queue对象,因为它在内部使用Lock

我想说,继承Lock对象比使用Manager.Lock()对象更好,因为当您使用Manager.Lock时,您对Lock的每个调用都必须通过IPC发送到Manager进程,这将比使用位于调用进程内的共享Lock进程慢得多。不过,这两种方法都是完全有效的。

最后,可以不使用Manager,使用initializer/initargs关键字参数,将Lock传递给Pool的所有成员:

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

这是因为传递给initargs的参数被传递给在Pool内运行的Process对象的__init__方法,所以它们最终被继承,而不是被pickle。

相关问题 更多 >