<p>修复了我的多处理问题-并实际切换了线程。我不知道到底是什么修复了它的想法-我只是重新设计了所有的东西,让工人和任务,什么不做,事情现在都在飞。以下是我所做的基本工作:</p>
<pre><code>import abc
from Queue import Empty, Queue
from threading import Thread
class AbstractTask(object):
"""
The base task
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def run_task(self):
pass
class TaskRunner(object):
def __init__(self, queue_size, num_threads=1, stop_on_exception=False):
super(TaskRunner, self).__init__()
self.queue = Queue(queue_size)
self.execute_tasks = True
self.stop_on_exception = stop_on_exception
# create a worker
def _worker():
while self.execute_tasks:
# get a task
task = None
try:
task = self.queue.get(False, 1)
except Empty:
continue
# execute the task
failed = True
try:
task.run_task()
failed = False
finally:
if failed and self.stop_on_exception:
print('Stopping due to exception')
self.execute_tasks = False
self.queue.task_done()
# start threads
for i in range(0, int(num_threads)):
t = Thread(target=_worker)
t.daemon = True
t.start()
def add_task(self, task, block=True, timeout=None):
"""
Adds a task
"""
if not self.execute_tasks:
raise Exception('TaskRunner is not accepting tasks')
self.queue.put(task, block, timeout)
def wait_for_tasks(self):
"""
Waits for tasks to complete
"""
if not self.execute_tasks:
raise Exception('TaskRunner is not accepting tasks')
self.queue.join()
</code></pre>
<p>我所要做的就是创建一个TaskRunner并向其添加任务(数千个),然后调用wait_for_tasks()。所以,很明显,在我所做的重新架构中,我“修复”了我遇到的其他一些问题。不过很奇怪。在</p>