Python线程:我缺少什么?(task_done()调用次数过多)

2024-09-24 22:30:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我为前面那篇冗长的帖子道歉。希望它能为解决方案提供足够的上下文。我试图创建一个实用程序函数,它将获取任意数量的旧classmethod并将它们放入多线程队列中:

class QueuedCall(threading.Thread):

    def __init__(self, name, queue, fn, args, cb):
        threading.Thread.__init__(self)
        self.name = name

        self._cb = cb
        self._fn = fn
        self._queue = queue
        self._args = args

        self.daemon = True
        self.start()

    def run(self):
        r = self._fn(*self._args) if self._args is not None \
            else self._fn()

        if self._cb is not None:
            self._cb(self.name, r)

            self._queue.task_done()

下面是我的调用代码(在类中)的外观

data = {}
def __op_complete(name, r):
    data[name] = r

q = Queue.Queue()

socket.setdefaulttimeout(5)

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
q.put(QueuedCall('so_answers', q, StackExchange.get_answers,
    ['api.stackoverflow.com', 534476, 5], __op_complete))
q.put(QueuedCall('so_user', q, StackExchange.get_user_info,
    ['api.stackoverflow.com', 534476], __op_complete))
q.put(QueuedCall('p_answers', q, StackExchange.get_answers,
    ['api.programmers.stackexchange.com', 23901, 5], __op_complete))
q.put(QueuedCall('p_user', q, StackExchange.get_user_info,
    ['api.programmers.stackexchange.com', 23901], __op_complete))
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete))

q.join()
return data

我在这里遇到的问题是,在重新启动服务器时,它似乎每隔一次就工作,但每秒钟或第三次请求都失败,并出现错误:

ValueError: task_done() called too many times

这个错误每隔一秒或第三次请求就会出现在一个随机线程中,所以很难准确地确定问题所在。

有人有什么想法和/或建议吗?

谢谢。


编辑:

我已经添加了prints来调试这个(快速和脏的而不是日志记录)。一个print语句(print 'running thread: %s' % self.name)在run的第一行,另一个在调用task_done()print 'thread done: %s' % self.name)之前。

成功请求的输出:

running thread: twitter
running thread: so_answers
running thread: so_user
running thread: p_answers
thread done: twitter
thread done: so_user
running thread: p_user
thread done: so_answers
running thread: fb_image
thread done: p_answers
thread done: p_user
thread done: fb_image

不成功请求的输出:

running thread: twitter
running thread: so_answers
thread done: twitter
thread done: so_answers
running thread: so_user
thread done: so_user
running thread: p_answers
thread done: p_answers
Exception in thread p_answers:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run
    self._queue.task_done()
  File "/usr/lib/python2.7/Queue.py", line 64, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

running thread: p_user
thread done: p_user
running thread: fb_image
thread done: fb_image

Tags: nameselftaskgetsoqueueputthread
2条回答

你解决这个问题的方法是“非传统的”。但暂时不理睬。。。问题很简单,在您给出的代码中

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))

很明显,以下工作流是可能发生的

  1. 线程由QueuedCall构造并启动
  2. 然后它被放入队列q中。。。在队列完成插入项的逻辑之前,独立线程已经完成了工作,并尝试调用q.task_done()。这会导致出现错误(在将对象安全放入队列之前调用了task_done())

应该怎么做?您不会将线程插入队列。队列保存线程处理的数据。所以相反你

  • 创建队列。插入您想要完成的it作业(例如函数、它们想要的参数和回调)
  • 创建并启动工作线程
  • 工作线程调用
    • q、 get()以获取要调用的函数
    • 调用它
    • 调用q.task_done()让队列知道该项已被处理。

我在这里可能有误解,但我不确定您是否正确使用了Queue

从对文档的简要调查来看,您的想法是可以使用put方法将工作放入Queue中,然后另一个线程可以调用get以从中获得一些工作,完成工作,完成后再调用task_done

代码似乎要做的是将put实例QueuedCall放入队列中。从队列中什么也没有,get,但是QueuedCall实例也被传递一个引用到它们被插入的队列中,并且它们执行它们的工作(它们本质上知道,不是因为它们从队列中get),然后调用task_done

如果我对所有这些的理解都是正确的(你没有从我看不到的其他地方调用get方法),那么我相信我理解这个问题。

问题是,必须先创建QueuedCall实例,然后才能将其放入队列,创建实例的行为将在另一个线程中开始工作。如果线程在主线程成功地putQueuedCall放入队列之前完成其工作并调用task_done,则可以得到所看到的错误。

我认为只有当你第一次偶然地运行它时,它才起作用。GIL“帮助”您很多;不太可能QueuedCall线程实际获得GIL并立即开始运行。事实上,除了作为计数器之外,您实际上并不关心队列,这似乎也“有助于”工作:只要队列不是空的,那么QueuedCall是否还没有到达队列就无关紧要(这个QueuedCall只能task_done队列中的另一个元素,当元素调用task_done时,这一个可能会在队列中,并且它可以被标记为已完成)。添加sleep也会使新线程稍等片刻,从而使主线程有时间确保它们确实在队列中,这也是掩盖问题的原因。

另外请注意,根据我对交互式shell的一些快速修改,您的队列在最后实际上仍然是满的,因为您从来没有真正地get任何东西。它刚刚收到了一些task_done消息,相当于其中的put个数,因此join起作用。

我认为您需要彻底重新设计您的QueuedCall类的工作方式,或者使用与Queue不同的同步原语。设计的Queue用于对已存在的工作线程的工作进行排队。从放在队列中的对象的构造函数中启动线程并不是很合适。

相关问题 更多 >