import luigi
class FileToStaging(ImportToTable):
filename = Luigi.Parameter(default = '')
#import file from some folder to a staging database
def requires(self):
return luigi.LocalTarget(self.filename)
#truncate table
#load the file into staging
class StgToOfficial(RunQuery):
filename = Luigi.Parameter
# run a process in the database to load data from staging to the final table
def requires(self):
return FileToStaging(self.filename)
# run query
class LoadFileGroups(luigi.WrapperTask):
def requires(self):
list_of_files = get_list_of_files_currently_in_folder() # The folder can have an arbitrary number of files inside
for file in list_of_files:
yield(StgToOfficial(filename = file))
你好,社区
我是Luigi的新手,正在尝试使用该框架构建ETL流程
假设我有一个与前面的伪代码片段类似的过程。进程必须检查文件夹并获取其中的文件列表。然后,逐个导入到staging数据库,并运行一个进程将staging中的数据加载到最终表中
问题是,在前面的解决方案中,加载到临时表中的所有文件(随后是每个文件的加载过程)都是并行运行的,这是不可能发生的。如何强制Luigi按顺序执行任务?仅当文件在最终表中完成加载时,才导入下一个文件,依此类推。(查看下面的草稿以获得简化草稿)
Draft of the structure I'm trying to achieve
我知道我应该使用requires方法来确保顺序,但是对于要加载的未知数量的文件,我如何才能以友好的方式进行呢
事先非常感谢您的帮助
根据以下讨论中Peter Weissbrod的答案,通过在requires()方法中创建递归模式解决: https://groups.google.com/g/luigi-user/c/glvU_HxYmr0/m/JvV3xgsiAwAJ
以下是Peter提出的解决方案:
相关问题 更多 >
编程相关推荐