"将多进程作业的中间结果储存在文件系统中,并在稍后继续进行处理"

2024-10-04 07:27:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个使用multiprocessing包并通过调用函数的作业

resultList = pool.map(myFunction, myListOfInputParameters)。在

输入参数列表中的每个条目都独立于其他条目。在

这项工作需要几个小时。出于安全考虑,我希望以固定的时间间隔(例如,每小时一次)存储中间的结果。在

当作业中止并希望根据上次可用备份重新启动时,如何才能执行此操作并继续处理?在


Tags: map列表参数间隔作业时间条目备份
2条回答

至少有两种可能的选择。在

  1. myFunction的每个调用将其输出保存到一个唯一命名的文件中。文件名应基于或链接到输入数据。使用父程序收集结果。在这种情况下,myFunction应该返回已完成项的标识符。在
  2. 使用imap_unordered代替map。这将在结果可用时立即开始生成结果,而不是在所有处理完成后再返回。让父程序保存返回的数据并指出哪些项目已完成。在

在这两种情况下,程序都必须检查以前运行时保存的数据,以便在重新启动时调整myListOfInputParameters。在

哪个选项最好在很大程度上取决于myFunction返回的数据量。如果这是一个很大的数额,有一个巨大的开销与转移回母公司。在这种情况下,选择1可能是最好的。在

由于写入磁盘的速度相对较慢,使用选项2时计算速度可能会更快。父程序更容易跟踪进度。在

请注意,您还可以将imap_unordered与选项1一起使用。在

也许用泡菜。阅读更多信息:

https://docs.python.org/3/library/pickle.html

基于aws_学徒的评论,我创建了一个完整的多处理示例,以防您不确定如何使用中间结果。第一次运行时,它将打印“无”,因为没有中间结果。再次运行它以模拟重新启动。在

from multiprocessing import Process
import pickle

def proc(name):
  data = None

  # Load intermediate results if they exist
  try:
    f = open(name+'.pkl', 'rb')
    data = pickle.load(f)
    f.close()
  except:
    pass

  # Do something
  print(data)
  data = "intermediate result for " + name

  # Periodically save your intermediate results
  f = open(name+'.pkl', 'wb')
  pickle.dump(data, f, -1)
  f.close()

processes = []
for x in range(5):
  p = Process(target=proc, args=("proc"+str(x),))
  p.daemon = True
  p.start()
  processes.append(p)

for process in processes:
  process.join()

for process in processes:
  process.terminate()

如果有必要,也可以使用json以人类可读的格式输出中间结果。或者sqlite作为数据库,如果您需要将数据推送到行中。在

相关问题 更多 >