如何在不浪费太多cpu周期的情况下等待多线程队列不是空的

2024-05-11 18:03:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我想让一个线程等待多线程队列不为空。队列只有一个生产者和一个消费者。当任务可用时,生产者将任务放入队列中,但生产者必须等到收集到两个或多个任务。我之所以不两次使用get方法来检索两个任务,是因为它使算法流程过于复杂。但这不能在下面的片段中描述,因为显然这只是一个过于简单化的例子。在

我需要知道队列不是空的,这样我就可以比较队列的峰值(不删除它)和我刚刚用get删除的元素

睡眠是如何做到的:

while myQueue.empty():
    sleep(0.05)

我怎么能不使用睡眠呢?我应该使用event.wait()?如果是,我就不知道该如何正确地使用event.clear()命令。因为我要等待的线程也是消费者,所以我不能确定队列是否为空。即使我用queue.empty()来检查。在


Tags: 方法算法event元素get队列消费者流程
3条回答

只要myQueue.get(block=True)将阻塞您的线程(停止它的执行),直到有东西可以从队列中检索。当队列中有可用项时,此调用将返回该项。您可以添加一个超时,以防在队列从未被馈送时退出。在

https://docs.python.org/3/library/queue.html#queue.Queue.get。在

实际上,似乎您需要实现Queue.peek()方法,该方法将返回队列中的下一个元素,而不实际删除它。在

此方法在标准队列对象中不可用,但您可以继承和扩展它而不会出现问题:

from Queue import Queue
class VoyeurQueue(Queue):
    def peek(self, block=True, timeout=None):
        # ...

现在对于新的peek()方法的内容,您可以简单地复制并粘贴基础Queue对象的get()方法的内容。如果您在Linux上,您可以在/usr/lib/python?.?/Queue.py找到它,如果您在Windows上,可以在%PYTHONPATH%/lib/Queue.py找到它(不确定后者,因为我当前在Linux机器上,无法检查)。在我的python2.7版本中,get()方法实现为:

^{pr2}$

现在,为了区别。您不希望移除元素,因此我们定义以下内容代替_get()

def _peek(self):
    return self.queue[0]

peek()方法中,我们仍然使用self.not_empty条件,但不再需要{}。因此,生成的代码将如下所示:

from Queue import Queue

class VoyeurQueue(Queue):

    def peek(self, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._peek()
            return item
        finally:
            self.not_empty.release()

    def _peek(self):
        return self.queue[0]

您可以使用初始化为零的信号量,并与队列并行。例如mySemaphore = threading.Semaphore(0)。默认情况下,调用mySempahore.acquire()的线程将被阻塞,因为信号量为零而不接触队列。然后,当您在队列中放入一些东西时,您可以调用mySemaphore.release(),它将允许一个线程执行(假设使用下一个循环)。在

相关问题 更多 >