我创建了自己的线程类,将在线程池中使用,如下所示:
class SendFilesThread(threading.Thread):
execute = True
def __init__(self, port, rate, fileQueue):
self.daemon = True
self.fileQueue = fileQueue
#Other initialization
def run(self):
while SendFilesThread.execute == True or not self.fileQueue.empty():
self.filename = self.fileQueue.get()
#some file processing
self.fileQueue.task_done()
@staticmethod
def terminate():
SendFilesThread.execute = False
在程序的主线程中,处理完所有文件后,我尝试关闭线程池,如下所示:
^{pr2}$我的理解是,如果我调用join()
,它将阻止调用线程(在本例中是主线程)继续,直到连接的线程完成处理。我的问题是,尽管线程完成了,它永远不会返回到主线程,程序只是挂起。关闭线程池时我是否做错了什么?在
有两件事。首先,从设计的角度来看,
execute
标志在类级别有点奇怪。通常最好在每个类的实例上设置一个标志。在第二,你有比赛条件。
execute
标志由多个线程访问,不受同步原语(如互斥锁)的保护。这意味着terminate()
调用可以在之后运行(并设置标志),任何工作线程都会检查该标志,但在之前,该线程会尝试将下一个文件名出列。因为您没有超时地调用get()
,所以工作线程将挂在这里,主线程将在t.join()
调用中阻塞。僵局接踵而至。在有很多方法可以解决这个问题。可以使用线程同步原语(如互斥体)来保护
execute
标志,也可以使用threading.Event
对象来代替简单的布尔值。在另一个在我看来更简单的解决方案是在同一个队列上发送一个“sentinel”值,这表示线程应该退出。看起来您发送的是字符串文件名,因此空字符串可能是一个不错的选择。(
None
也常用。)现在每个线程的工作循环如下所示:
主线程不是
terminate()
静态方法,而是为每个工作线程放置一个空字符串(即fileQueue.enqueue('')
)。在如果},那么它的
fileQueue
是multiprocessing.Queue
或{.get()
方法将阻塞,直到队列中有东西到达。尝试使用.get_nowait()
,但如果您希望即使队列为空时也继续执行,则可能必须将其包装在Try块中。在有关详细信息,请参见queue docs
相关问题 更多 >
编程相关推荐