<p>我用线程完成了这项工作,最后使用<a href="https://docs.python.org/2/library/queue.html#queue-objects" rel="nofollow noreferrer">a queue</a>来处理作业管理。这是基线。我的完整版本有一堆<code>try-catches</code>(特别是在worker中,以确保即使失败也调用<code>q.task_done()</code>)。</p>
<pre><code>from threading import Thread
from queue import Queue
import time
import random
def run(idx, *args):
time.sleep(random.random() * 1)
print idx, ':', args
def run_jobs(jobs, workers=1):
q = Queue()
def worker(idx):
while True:
args = q.get()
run(idx, *args)
q.task_done()
for job in jobs:
q.put(job)
for i in range(0, workers):
t = Thread(target=worker, args=[i])
t.daemon = True
t.start()
q.join()
if __name__ == "__main__":
run_jobs([('job', i) for i in range(0,10)], workers=5)
</code></pre>
<p>我不需要使用多处理(我的工作人员只是调用外部进程),但这可以扩展。用于多处理的API改变了它,下面是如何适应的:</p>
<pre><code>from multiprocessing import Process, Queue
from Queue import Empty
import time
import random
def run(idx, *args):
time.sleep(random.random() * i)
print idx, ':', args
def run_jobs(jobs, workers=1):
q = Queue()
def worker(idx):
try:
while True:
args = q.get(timeout=1)
run(idx, *args)
except Empty:
return
for job in jobs:
q.put(job)
processes = []
for i in range(0, workers):
p = Process(target=worker, args=[i])
p.daemon = True
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
run_jobs([('job', i) for i in range(0,10)], workers=5)
</code></pre>
<p>两个版本都将输出如下内容:</p>
<pre><code>0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)
</code></pre>