如何确保同一管道不会同时执行两次

2024-09-26 22:49:53 发布

您现在位置:Python中文网/ 问答频道 /正文

嘿:)我有一个关于锁定或互斥行为的问题

场景

让我们假设以下场景:

  1. 管道正在处理一些本地文件。这些文件由CI-CD作业放置。处理后,我想删除这些文件。如果作业花费的时间超过计划间隔,这将导致竞争条件
  2. 两条管道资源非常丰富,因此无法并行运行

可能的解决方案

  • 目前,我会在正在运行的服务中使用某种互斥或锁,其中管道可以注册并允许执行或不执行
  • 复制数据以确保每个工作流都可以清理和使用自己的数据
  • 创建一个本地锁文件,并确保成功后将删除该文件
  • 创建一个较小的计划间隔,并检查是否存在锁。如果条件未满足,则干净地退出

我知道这可能不是dagster的正常用例,但我也希望将dagster用于其他工作流,例如清理任务和其他管道的触发器

谢谢


Tags: 文件数据ci间隔管道作业时间场景
3条回答

感谢分享您的用例。我认为Dagster目前不支持这些特性。但是,0.10.0版本(几个月后)将包括运行级队列,允许您对并发管道运行进行限制。目前,它只支持全局限制运行,但很快将支持添加基于管道标记的规则(例如,标记为“资源密集型”的管道可以限制为3次并发运行)。看起来这可能适合这个用例

预览当前排队系统的指南是here。也请在@johann的Dagster slack上随时联系我

对于场景#2(处理资源非常密集且无法并行运行的管道)的建议是使用dagster的Celery集成,例如celery_executorcelery_docker_executorcelery_k8s_job_executor(如果您在kubernetes上)

这些工作方式是Dagster管道运行协调器将每个solid执行任务添加到芹菜队列,芹菜允许您限制每个队列中活动任务的数量。例如,这通常用于确保在给定时间只有X个实体连接到红移

Dagster还支持使用多个队列,因此您可以为资源密集型实体创建一个队列,为非资源密集型实体创建另一个队列(具有更高的并发限制)

关于场景1,我不确定您的设计约束是什么。一个想法是使用管道运行标记的标记方案来跟踪哪个管道运行对应于哪个文件;然后,对于每个文件,执行文件清理的进程首先验证删除之前是否存在成功的管道运行(通过查询运行数据库)

我不熟悉dagster,但我在其他上下文中成功使用的一种机制是利用这样一个事实,即在类似Unix的系统中,rename或mv是一种原子操作。对于运行后清理的第一个要求:

  1. 新文件被放入输入目录。一组输入文件可以在它们自己的目录中隔离

  2. 当管道进程启动时,它的第一个操作是从输入目录中选择一个文件(或目录),并将其移动到管道实例拥有的工作目录中。如果输入目录中没有可用的文件,进程会自动关闭

  3. 如果mv成功,进程将继续在它刚刚移动到其工作目录的文件(目录)上执行它的操作。当它完成时,它会自行清理,可能是通过对其工作目录执行递归删除

  4. 如果mv失败,则意味着另一个进程从该进程下取出了新文件。失败的过程会优雅地自行关闭

对于一次只运行一个管道进程的第二个要求,您可以使用独占方式创建sentinel文件,如果该进程没有成功创建sentinel文件,则该进程将失败并退出。在Python3中,代码可能类似于

try:
    open('sentinel', 'x').close()
except FileExistsError:
    exit("someone else already has sentinel")

do_stuff()

os.remove('sentinel')

当然,如果您的进程在do_stuff()中的某个地方崩溃,您必须手动清理sentinel文件,或者您可以使用atexit处理程序确保即使在do_stuff()中崩溃的情况下也删除sentinel

相关问题 更多 >

    热门问题