我正在研究一个Luigi管道,它检查手动创建的文件是否存在,如果存在,则继续执行以下任务:
import luigi, os
class ExternalFileChecker(luigi.ExternalTask):
task_namespace='MyTask'
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))
class ProcessExternalFile(luigi.Task):
task_namespace='MyTask'
path = luigi.Parameter()
def requires(self):
return ExternalFileChecker(path=self.path)
def output(self):
dirname = self.path
outfile = os.path.join(dirname, 'processedfile.txt')
return luigi.LocalTarget(outfile)
def run(self):
#do processing
if __name__ == '__main__':
path = r'D:\MyPath\luigi'
luigi.run(['MyTask.ProcessExternalFile','--path', path,\
'--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
'--worker-keep-alive'])
我想要的是,在我创建了手动文件并将其粘贴到路径中之后,luigi将继续。执行此操作时,它将每隔几秒钟重新检查一次新任务,而不是查找文件并继续执行任务:
^{pr2}$在相当长的时间(15-20分钟左右)之后,luigi将找到该文件,然后它就可以按需要继续。我能做些什么来防止这种延误?我希望luigi在文件存在后继续。在
请记住以下几点:
keep_alive = True
,在这种情况下,它将在没有更多挂起的任务时退出)。在retry_external_tasks
节中的retry_external_tasks
配置设置控制。在我想你看到的是这样的。您的管道正在运行,任务
ProcessExternalFile
失败,然后您添加文件,任务在retry_delay
期间保持失败,最后它变为挂起状态,并且再次向工作线程授予此任务,此时它发现了文件,任务就完成了。在这是否是你想要的行为取决于你。如果希望更快地找到文件,可以更改重试间隔。或者您可以在
run
方法中执行一个无限的while
循环,并定期检查该文件,当找到该文件时就退出循环。您还可以将Luigi配置为完全禁用重试逻辑。在相关问题 更多 >
编程相关推荐