Python线程偶尔会返回失败

2024-10-01 00:25:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个多线程的Python脚本,它获取一个文件名列表并将它们放入队列。大多数时候它都能工作,但是我偶尔会发现它卡住了,并且“ps-efL”会显示两个线程为python脚本打开。我用strace跟踪它,6个线程中有5个线程返回,但有一个线程只是挂在futex等待永远。你知道吗

下面是有问题的代码块。你知道吗

threads = 6 

for fileName in fileNames:
  queue.put(fileName)

for i in range(threads):
  t = threading.Thread(target=get_backup_list, args=(queue,dbCreds,arguments.verbose,arguments.vault))
  activeThreads.append(t)
  t.start()

for activeThread in activeThreads:
  activeThread.join()


def get_backup_list(queue,dbCreds,verbosity,vault):
  backupFiles = []

  while True:
    if queue.empty() == True:
      return 
    fileName = queue.get()
    try:
      fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
      if not fileInfo:
        start = time.time()
        attributes = get_attributes(fileName,verbosity)
        end = time.time() - start
        if verbosity: print("finished in ") + str(end) + (" seconds")
        insert_file(attributes,dbCreds,vault)
        fileInfo = lookup_file_by_path(fileName,dbCreds,vault)

    except Exception, e:
      print("error on " + fileName + " " + str(e))

  return

def lookup_file_by_path(path,dbCreds,vault):
  attributes = {}
  conn = mdb.connect(dbCreds['server'] , dbCreds['user'], dbCreds['password'], dbCreds['database'], cursorclass=MySQLdb.cursors.DictCursor);
  c = conn.cursor()
  c.execute('''SELECT * FROM {} where path = "%s" '''.format(vault) % ( path ) )
  data = c.fetchone()
  if data:
    for key in data.keys():
      attributes[key] = data[key]
  conn.close
  return attributes

我是不是做错了什么导致了比赛状态?或者我还缺什么。你知道吗

谢谢你, 托马斯C


Tags: pathinfordatagetiftimequeue
1条回答
网友
1楼 · 发布于 2024-10-01 00:25:29

您的代码中存在竞争条件:

while True:
     if queue.empty() == True:
        return 
     fileName = queue.get()

首先,线程检查队列是否为空。如果不是,则尝试阻塞get。但是,在调用queue.empty()queue.get之间,另一个线程可能已经消耗了队列中的最后一项,这意味着get调用将永远阻塞。您应该这样做:

try:
    fileName = queue.get_nowait()
except Queue.Empty:
    return

如果这还不能解决问题,您可以将一些print语句扔到线程化方法中,以准确地确定它卡住的地方,然后从那里开始。但是,没有其他并发问题向我袭来。你知道吗

编辑:

另一方面,您在这里所做的可以更干净地实现为ThreadPool^{}

from multiprocessing.pool import ThreadPool
from functools import partial

def get_backup_list(dbCreds, verbosity, vault, fileName):
  backupFiles = []
  fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
  ...

if __name__ == "__main__":
    pool = ThreadPool(6) # You could use a multiprocessing.Pool, too
    func = partial(get_backup_list, dbCreds, arguments.verbose, arguments.vault)
    pool.map(func, fileNames)
    pool.close()
    pool.join()

根据每个对get_backup_list的调用所做的工作,您可能会发现它作为multiprocessing.Pool执行得更好,因为它能够绕过全局解释器锁(Global Interpreter Lock,GIL),这会阻止Python线程在CPU核之间并发执行。不过,看起来您的代码可能是I/O绑定的,所以ThreadPool可能就可以了。你知道吗

相关问题 更多 >