我有一些Python代码,它们使用ThreadPoolExecutor将昂贵的作业分包出去,我想跟踪哪些作业已经完成,这样,如果需要重新启动系统,就不必重做已经完成的工作。在单线程上下文中,我可以在书架上标记我所做的。下面是一个简单的多线程环境移植:
from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve
def do_thing(done, x):
# Don't let the command run in the background; we want to be able to tell when it's done
_ = subprocess.check_output(["some_expensive_command", x])
done[x] = True
futs = []
with shelve.open("done") as done:
with ThreadPoolExecutor(max_workers=18) as executor:
for x in things_to_do:
if done.get(x, False):
continue
futs.append(executor.submit(do_thing, done, x))
# Can't run `done[x] = True` here--have to wait until do_thing finishes
for future in futs:
future.result()
# Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
# before we get through all of things_to_do
我能逃脱惩罚吗?documentation for shelve不包含任何关于线程安全的保证,所以我认为没有
那么处理这个问题的简单方法是什么呢?我想也许把done[x] = True
粘在future.add_done_callback
中就可以了,但是that will often run in the same thread as the future itself。也许有一种锁机制可以很好地与ThreadPoolExecutor配合使用?在我看来,编写一个休眠的循环,然后检查已完成的未来,这似乎更干净。在
当您仍在最外层的
with
上下文管理器中时,done
shelve只是一个普通的python对象-只有在上下文管理器关闭并运行其__exit__
方法时,才会将其写入磁盘。因此,由于GIL(只要使用CPython),它与其他python对象一样是线程安全的。在具体地说,重新分配
done[x] = True
是线程安全的/将以原子方式完成。在需要注意的是,虽然shelve的
__exit__
方法将在Ctrl-C之后运行,但如果python进程突然结束,则不会运行,并且shelve不会保存到磁盘上。在为了防止这种失败,我建议使用轻量级的基于文件的线程安全数据库,比如sqllite3。在
相关问题 更多 >
编程相关推荐