Python中ThreadPoolExecutor上下文中的轻度持久性

2024-04-24 00:42:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一些Python代码,它们使用ThreadPoolExecutor将昂贵的作业分包出去,我想跟踪哪些作业已经完成,这样,如果需要重新启动系统,就不必重做已经完成的工作。在单线程上下文中,我可以在书架上标记我所做的。下面是一个简单的多线程环境移植:

from concurrent.futures import ThreadPoolExecutor
import subprocess
import shelve


def do_thing(done, x):
    # Don't let the command run in the background; we want to be able to tell when it's done
    _ = subprocess.check_output(["some_expensive_command", x])
    done[x] = True


futs = []
with shelve.open("done") as done:
    with ThreadPoolExecutor(max_workers=18) as executor:
        for x in things_to_do:
            if done.get(x, False):
                continue
            futs.append(executor.submit(do_thing, done, x))
            # Can't run `done[x] = True` here--have to wait until do_thing finishes
        for future in futs:
            future.result()

    # Don't want to wait until here to mark stuff done, as the whole system might be killed at some point
    # before we get through all of things_to_do

我能逃脱惩罚吗?documentation for shelve不包含任何关于线程安全的保证,所以我认为没有

那么处理这个问题的简单方法是什么呢?我想也许把done[x] = True粘在future.add_done_callback中就可以了,但是that will often run in the same thread as the future itself。也许有一种锁机制可以很好地与ThreadPoolExecutor配合使用?在我看来,编写一个休眠的循环,然后检查已完成的未来,这似乎更干净。在


Tags: thetoruninimporttrueforas
1条回答
网友
1楼 · 发布于 2024-04-24 00:42:58

当您仍在最外层的with上下文管理器中时,doneshelve只是一个普通的python对象-只有在上下文管理器关闭并运行其__exit__方法时,才会将其写入磁盘。因此,由于GIL(只要使用CPython),它与其他python对象一样是线程安全的。在

具体地说,重新分配done[x] = True是线程安全的/将以原子方式完成。在

需要注意的是,虽然shelve的__exit__方法将在Ctrl-C之后运行,但如果python进程突然结束,则不会运行,并且shelve不会保存到磁盘上。在

为了防止这种失败,我建议使用轻量级的基于文件的线程安全数据库,比如sqllite3。在

相关问题 更多 >