Python线程,无阻塞生产

2024-09-27 00:15:46 发布

您现在位置:Python中文网/ 问答频道 /正文

ThreadedWorkerQueue.add_worker()方法将阻塞,直到Worker被使用。是否有一个很好的设计允许在不阻塞调用~.add_worker()的线程的情况下向ThreadedWorkerQueue添加新的工作线程,但仍在使用条件?在

下面是一个简短的SSCCE:

import time
import threading

class Worker(object):

    def work(self):
        pass

class TimeWorker(Worker):

    def __init__(self, seconds):
        super(TimeWorker, self).__init__()
        self.seconds = seconds

    def work(self):
        for i in xrange(self.seconds):
            print "Working ... (%d)" % i
            time.sleep(1)

class ThreadedWorkerQueue(threading.Thread):

    def __init__(self):
        super(ThreadedWorkerQueue, self).__init__()
        self.condition = threading.Condition()
        self.workers = []
        self.running = False

    def add_worker(self, worker):
        with self.condition:
            self.workers.append(worker)
            self.condition.notify()

    def stop(self):
        with self.condition:
            self.running = False
            self.condition.notify()
        self.join()

    def consume(self):
        if self.workers:
            worker = self.workers.pop(0)
            worker.work()

    def run(self):
        self.running = True
        while True:
            with self.condition:
                if not self.running:
                    break

                self.condition.wait()
                self.consume()

def main():
    queue = ThreadedWorkerQueue()
    queue.start()

    queue.add_worker(TimeWorker(3))
    time.sleep(1)

    tstart = time.time()
    queue.add_worker(TimeWorker(2))
    print "Blocked", time.time() - tstart, "seconds until worker was added."

    queue.stop()

main()

编辑

好吧,所以我最初的想法是当线程 可以继续消耗工人。这是 生产者/消费者设计,跳过连续轮询,真正只做 有活干就干。在

刚才,我有个主意,使用一个默认获取的锁 当新工人可以被消耗时释放。但我不确定这是不是 做这件事的好方法。有人能发现问题吗(例如潜在的死锁)?在

完整的代码在GitHub上:https://github.com/NiklasRosenstein/async/blob/73828ecaa2990a71b63caf93c32f9cce5ec11d27/async.py#L686-L750

^{pr2}$

Tags: selfaddtimequeueinitdefcondition线程
1条回答
网友
1楼 · 发布于 2024-09-27 00:15:46

... but still working with conditions?

你不需要条件。Python已经有了一个完美的机制:^{}。将ThreadedWorkerQueue.workers从列表改为Queue,您不需要担心条件、锁、通知等,这将大大简化您的代码。在

您需要替换:

  • 具有Queue的工人列表
  • 追加并通知Queue.put
  • wait+pop与Queue.get

然后去掉with self.condition: ...。在

另外,从stop()内部调用self.join()不是一个好的实践。把它留给主叫线程吧。如果你有几个线程需要停止,你应该先停止所有线程,然后再将它们全部连接起来。在

相关问题 更多 >

    热门问题