ThreadedWorkerQueue.add_worker()
方法将阻塞,直到Worker
被使用。是否有一个很好的设计允许在不阻塞调用~.add_worker()
的线程的情况下向ThreadedWorkerQueue
添加新的工作线程,但仍在使用条件?在
下面是一个简短的SSCCE:
import time
import threading
class Worker(object):
def work(self):
pass
class TimeWorker(Worker):
def __init__(self, seconds):
super(TimeWorker, self).__init__()
self.seconds = seconds
def work(self):
for i in xrange(self.seconds):
print "Working ... (%d)" % i
time.sleep(1)
class ThreadedWorkerQueue(threading.Thread):
def __init__(self):
super(ThreadedWorkerQueue, self).__init__()
self.condition = threading.Condition()
self.workers = []
self.running = False
def add_worker(self, worker):
with self.condition:
self.workers.append(worker)
self.condition.notify()
def stop(self):
with self.condition:
self.running = False
self.condition.notify()
self.join()
def consume(self):
if self.workers:
worker = self.workers.pop(0)
worker.work()
def run(self):
self.running = True
while True:
with self.condition:
if not self.running:
break
self.condition.wait()
self.consume()
def main():
queue = ThreadedWorkerQueue()
queue.start()
queue.add_worker(TimeWorker(3))
time.sleep(1)
tstart = time.time()
queue.add_worker(TimeWorker(2))
print "Blocked", time.time() - tstart, "seconds until worker was added."
queue.stop()
main()
好吧,所以我最初的想法是当线程 可以继续消耗工人。这是 生产者/消费者设计,跳过连续轮询,真正只做 有活干就干。在
刚才,我有个主意,使用一个默认获取的锁 当新工人可以被消耗时释放。但我不确定这是不是 做这件事的好方法。有人能发现问题吗(例如潜在的死锁)?在
完整的代码在GitHub上:https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750
^{pr2}$
你不需要条件。Python已经有了一个完美的机制:^{} 。将
ThreadedWorkerQueue.workers
从列表改为Queue
,您不需要担心条件、锁、通知等,这将大大简化您的代码。在您需要替换:
Queue
的工人列表Queue.put
Queue.get
然后去掉
with self.condition: ...
。在另外,从
stop()
内部调用self.join()
不是一个好的实践。把它留给主叫线程吧。如果你有几个线程需要停止,你应该先停止所有线程,然后再将它们全部连接起来。在相关问题 更多 >
编程相关推荐