pythonluigi满意后继续执行外部任务

2024-09-29 18:52:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在研究一个Luigi管道,它检查手动创建的文件是否存在,如果存在,则继续执行以下任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

我想要的是,在我创建了手动文件并将其粘贴到路径中之后,luigi将继续。执行此操作时,它将每隔几秒钟重新检查一次新任务,而不是查找文件并继续执行任务:

^{pr2}$

在相当长的时间(15-20分钟左右)之后,luigi将找到该文件,然后它就可以按需要继续。我能做些什么来防止这种延误?我希望luigi在文件存在后继续。在


Tags: 文件pathselftaskreturnparameterosdef
1条回答
网友
1楼 · 发布于 2024-09-29 18:52:21

请记住以下几点:

  1. Luigi工作线程在至少有一个任务正在运行之前不会退出(或者如果keep_alive = True,在这种情况下,它将在没有更多挂起的任务时退出)。在
  2. 失败的任务有重试逻辑,默认重试间隔为15分钟。在
  3. 重试逻辑的工作原理如下。在指定的重试间隔之后,调度程序将忘记任务的失败(与单击UI中的“原谅失败”按钮相同),并将任务的状态更改为挂起。下次工作线程向调度程序请求工作时,可以将此任务分配给工作线程。在
  4. 不完整的外部任务视为失败,取决于重试逻辑。在
  5. 外部任务的重试行为由retry_external_tasks节中的retry_external_tasks配置设置控制。在

我想你看到的是这样的。您的管道正在运行,任务ProcessExternalFile失败,然后您添加文件,任务在retry_delay期间保持失败,最后它变为挂起状态,并且再次向工作线程授予此任务,此时它发现了文件,任务就完成了。在

这是否是你想要的行为取决于你。如果希望更快地找到文件,可以更改重试间隔。或者您可以在run方法中执行一个无限的while循环,并定期检查该文件,当找到该文件时就退出循环。您还可以将Luigi配置为完全禁用重试逻辑。在

相关问题 更多 >

    热门问题