<p>您所做的是实现您自己的多处理池,但为什么?您是否不知道<code>concurrent.futures.ProcessPoolExecutor</code>和<code>multiprocessing.pool.Pool</code>类的存在,后者实际上更适合您的特定问题</p>
<p>这两个类都实现了多处理池和各种方法,用于将任务提交到池中并从这些任务中获取结果。但是,由于在您的特定情况下,您提交的任务正试图解决相同的问题,并且您只对第一个可用结果感兴趣,一旦完成了,您需要能够终止任何剩余的正在运行的任务。只有<code>multiprocessing.pool.Pool</code>允许您这样做</p>
<p>下面的代码使用方法<code>Pool.apply_async</code>提交任务。此函数不阻塞,而是返回一个<code>AsyncResult</code>实例,该实例具有一个阻塞<code>get</code>方法,您可以调用该方法从提交的任务中获取结果。但是,由于通常您可能会提交许多任务,我们不知道调用这些实例中的哪一个。因此,解决方案是使用<code>apply_async</code>的<code>callback</code>参数来指定一个函数,该函数将在任务可用时使用返回值异步调用。然后问题就变成了将这个结果传回。有两种方法:</p>
<p><strong>方法1:通过全局变量</p>
<pre class="lang-py prettyprint-override"><code>from multiprocessing import Pool
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(answer):
global solution
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
solution = answer
pool.terminate() # kill all tasks
if __name__ == '__main__':
t = time.time()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for all tasks to terminate:
pool.close()
pool.join()
print(solution)
print('Total elapsed time:', time.time() - t)
</code></pre>
<p>印刷品:</p>
<pre><code>9
Total elapsed time: 1.1378364562988281
</code></pre>
<p><strong>方法2:通过队列</strong></p>
<pre class="lang-py prettyprint-override"><code>from multiprocessing import Pool
from queue import Queue
import time
def worker1(x):
time.sleep(3) # emulate working on the problem
return 9 # the solution
def worker2(x):
time.sleep(1) # emulate working on the problem
return 9 # the solution
def callback(solution):
# gets all the returned results from submitted tasks
# since we are just interested in the first returned result, write it to the queue:
q.put_nowait(solution)
if __name__ == '__main__':
t = time.time()
q = Queue()
pool = Pool(2) # just two processes in the pool for demo purposes
# submit two tasks:
pool.apply_async(worker1, args=(1,), callback=callback)
pool.apply_async(worker2, args=(2,), callback=callback)
# wait for first returned result from callback:
solution = q.get()
print(solution)
pool.terminate() # kill all tasks in the pool
print('Total elapsed time:', time.time() - t)
</code></pre>
<p>印刷品:</p>
<pre><code>9
Total elapsed time: 1.1355643272399902
</code></pre>
<p><strong>更新</strong></p>
<p>即使在Windows下,与任务完成所需的时间相比,创建和重新创建池的时间可能相对较少,特别是对于以后的迭代,即<code>n</code>的较大值。如果调用的是同一个worker函数,那么第三种方法是使用pool方法<code>imap_unordered</code>。我还包括一些代码,用于测量我的桌面启动新池实例的开销:</p>
<pre class="lang-py prettyprint-override"><code>from multiprocessing import Pool
import time
def worker(x):
time.sleep(x) # emulate working on the problem
return 9 # the solution
if __name__ == '__main__':
# POOLSIZE = multiprocessing.cpu_count()
POOLSIZE = 8 # on my desktop
# how long does it take to start a pool of size 8?
t1 = time.time()
for i in range(16):
pool = Pool(POOLSIZE)
pool.terminate()
t2 = time.time()
print('Average pool creation time: ', (t2 - t1) / 16)
# POOLSIZE number of calls:
arguments = [7, 6, 1, 3, 4, 2, 9, 6]
pool = Pool(POOLSIZE)
t1 = time.time()
results = pool.imap_unordered(worker, arguments)
it = iter(results)
first_solution = next(it)
t2 = time.time()
pool.terminate()
print('Total elapsed time:', t2 - t1)
print(first_solution)
</code></pre>
<p>印刷品:</p>
<pre><code>Average pool creation time: 0.053139880299568176
Total elapsed time: 1.169790506362915
9
</code></pre>
<p><strong>更新2</strong></p>
<p>这是一个两难境地:您有多个进程处理一个拼图的各个部分。例如,当一个进程发现某个数字可被通过范围内的某个数字整除时,在其他进程中,从完成其测试开始测试不同范围内的数字是没有意义的。你可以做三件事中的一件。在开始下一次迭代之前,您可以什么都不做,让流程完成。但这会延迟下一次迭代。我已经建议您终止进程,这样可以释放处理器。但这需要你创建新的流程,而你发现这些流程并不令人满意</p>
<p>我只能想到另一种可能性,我在下面使用您的方法进行多处理。名为<code>stop</code>的多处理共享内存变量以每个进程作为全局变量进行初始化,并在每次迭代之前设置为0。当一个任务被设置为返回值0,并且在其他进程中运行的其他任务没有继续执行的意义时,它将<code>stop</code>的值设置为1。这意味着任务必须定期检查<code>stop</code>的值,如果该值已设置为1,则返回<strong>当然,这会给处理过程增加额外的周期。</strong>在下面的演示中,我实际上有100个任务排队等待8个处理器。但是,最后92个任务将立即发现<code>stop</code>已经设置,并且应该在第一次迭代时返回</p>
<p>顺便说一句:原始代码使用<code>multiprocessing.JoinableQueue</code>实例对任务进行排队,而不是使用<code>multiprocessing.Queue</code>实例,并在消息从队列中移除时在此实例上调用<code>task_done</code>。然而,在这个队列上从来没有调用过<code>join</code>(它会告诉您所有消息何时被删除),从而破坏了拥有这样一个队列的全部目的。事实上,不需要<code>JoinableQueue</code>,因为主进程已经提交了<code>num_jobs</code>个作业,并且在结果队列上期望<code>num_jobs</code>个消息,并且可以循环并拉取期望的n结果队列中的结果数。我用一个简单的<code>Queue</code>替换了<code>JoinableQueue</code>,保留了原始代码,但注释掉了。此外,<code>Consumer</code>进程可以创建为守护进程(参数为<code>daemon=True</code>),然后当所有非守护进程(即主进程)终止时,它们将自动终止,从而避免使用特殊的“毒丸”<code>None</code>任务消息。我已经做了更改,并且再次保持了原始代码的完整性,但是注释掉了以供比较</p>
<pre class="lang-py prettyprint-override"><code>import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, stop):
# make ourself a daemon process:
multiprocessing.Process.__init__(self, daemon=True)
self.task_queue = task_queue
self.result_queue = result_queue
self.stop = stop
def run(self):
global stop
stop = self.stop
while True:
next_task = self.task_queue.get()
"""
if next_task is None:
# Poison pill shutdown of .get() loop with break
#self.task_queue.task_done()
break
"""
answer = next_task()
#self.task_queue.task_done()
self.result_queue.put(answer)
# return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
global stop
# start the range from 1 to avoid dividing by 0:
for i in range(1, self.b):
# how frequently should this check be made?
if stop.value == 1:
return 0
if self.a % i == 0:
stop.value = 1
return 0
return 1
if __name__ == '__main__':
# Establish communication queues
#tasks = multiprocessing.JoinableQueue()
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
stop = multiprocessing.Value('i', 0)
consumers = [ Consumer(tasks, results, stop) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
# many more jobs than processes, but they will stop immediately once they check the value of stop.value:
num_jobs = 100
stop.value = 0 # make sure it is 0 before an iteration
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
results = [results.get() for _ in range(num_jobs)]
print(results)
print(0 in results)
"""
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
"""
</code></pre>
<p>印刷品:</p>
<pre><code>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
True
</code></pre>