Multiprocessing是python中一个强大的工具,我想更深入地理解它。 我想知道何时使用regularLocks和Queues以及何时使用多处理Manager在所有进程之间共享这些。
我提出了以下测试场景,其中有四种不同的多处理条件:
使用游泳池和否经理
使用游泳池和管理器
使用个别流程和否经理
使用单个流程和管理器
所有条件都执行作业函数the_job
。the_job
包含一些由锁保护的打印。此外,函数的输入只是放入一个队列中(看看它是否可以从队列中恢复)。这个输入只是在名为start_scenario
的主脚本中创建的idx
的索引(显示在底部)。
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
条件的成功被定义为完全调用输入
从队列中,查看底部的函数read_queue
。
条件1和条件2相当不言自明。 条件1包括创建锁和队列,并将它们传递给流程池:
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(helper函数make_iterator
在本文的底部给出。)
条件1失败,出现RuntimeError: Lock objects should only be shared between processes through inheritance
。
条件2非常相似,但现在锁和队列由管理器管理:
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
在条件3中,手动启动新进程,并且在没有管理器的情况下创建锁和队列:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
条件4类似,但现在再次使用管理器:
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
在两种情况下-3和4-我开始一个新的
处理the_job
的10个任务中的每一个,最多有ncores个进程
同时操作。这是通过以下助手函数实现的:
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
只有条件1失败(RuntimeError: Lock objects should only be shared between processes through inheritance
),而其他3个条件成功。我试着用我的头脑来思考这个结果。
为什么池需要在所有进程之间共享锁和队列,但条件3中的各个进程不需要?
我知道的是,对于池条件(1和2),来自迭代器的所有数据都是通过pickling传递的,而在单进程条件(3和4)中,来自迭代器的所有数据都是通过主进程的继承传递的(我使用的是Linux)。
我想,在从子进程中更改内存之前,将访问父进程使用的相同内存(写时复制)。但是只要有人说lock.acquire()
,就应该更改它,并且子进程确实在内存中的其他地方使用不同的锁,不是吗?一个子进程如何知道一个兄弟激活了一个不通过管理器共享的锁?
最后,有点相关的是我的问题,条件3和条件4有多大的不同。两者都有单独的过程,但它们在管理器的使用上有所不同。两者都被认为是有效的代码吗?或者,如果实际上不需要经理,那么应该避免使用经理吗?
对于那些只想复制和粘贴所有内容来执行代码的人,下面是完整的脚本:
__author__ = 'Me and myself'
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
else:
print 'Scenario %d successful!' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == '__main__':
main()
multiprocessing.Lock
是使用操作系统提供的信号量对象实现的。在Linux上,子对象只是通过os.fork
从父对象继承信号量的句柄。这不是信号量的副本;它实际上继承了父进程拥有的相同句柄,继承文件描述符的方式也一样。另一方面,Windows不支持os.fork
,因此它必须对Lock
进行pickle。它通过使用Windows ^{multiprocessing.Lock
对象在内部使用的Windows信号量创建一个重复的句柄来实现,该API声明:通过
DuplicateHandle
API,您可以将复制句柄的所有权授予子进程,以便子进程在取消对其的绑定后可以实际使用它。通过创建子对象拥有的重复句柄,可以有效地“共享”锁对象。这里是
multiprocessing/synchronize.py
中的信号量对象注意
assert_spawning
调用中的__getstate__
,它在拾取对象时被调用。这是如何实现的:这个函数通过调用
thread_is_spawning
,确保您“继承”了Lock
。在Linux上,该方法只返回False
:这是因为Linux不需要pickle来继承
Lock
,所以如果__getstate__
实际上是在Linux上调用的,我们就不能继承。在Windows上,还有更多:这里,
thread_is_spawning
如果Popen._tls
对象具有process_handle
属性,则返回True
。我们可以看到在__init__
中创建了process_handle
属性,然后使用dump
将要继承的数据从父级传递到子级,然后删除该属性。因此thread_is_spawning
将只在__init__
期间是True
。根据this python-ideas mailing list thread,这实际上是一个人为的限制,用于模拟Linux上的os.fork
行为。Windows实际上可以在任何时候支持传递Lock
,因为DuplicateHandle
可以在任何时候运行。以上所有内容都适用于
Queue
对象,因为它在内部使用Lock
。我想说,继承
Lock
对象比使用Manager.Lock()
对象更好,因为当您使用Manager.Lock
时,您对Lock
的每个调用都必须通过IPC发送到Manager
进程,这将比使用位于调用进程内的共享Lock
进程慢得多。不过,这两种方法都是完全有效的。最后,可以不使用
Manager
,使用initializer
/initargs
关键字参数,将Lock
传递给Pool
的所有成员:这是因为传递给
initargs
的参数被传递给在Pool
内运行的Process
对象的__init__
方法,所以它们最终被继承,而不是被pickle。相关问题 更多 >
编程相关推荐