我尝试动态创建Luigi任务(基于cmdList中的属性),并使前一个任务成为下一个任务的依赖项。SQLTask是路易吉。任务. 但是,当我运行这段代码时,我得到This progress looks :| because there were tasks that were not granted run permission by the scheduler
我错过了什么?在
class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def run(self):
print(subprocess.call(self.queryCmd, shell=True))
self.get_target().touch()
def dep_s_dep(cmdList, dep1):
"""
This returns dependency task's dependency
"""
dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
return dep2[0]
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
depend_task = ""
def run(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
runDesc = self.runDesc
bdx_sql = r'r:\\1.SQL\\BDX_SQL\\'
cmdList = [
('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
]
tasks = []
for queryKey, queryCmd, dependQry in cmdList:
class_name = queryKey
klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})
if dependQry != '':
dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
print(f"{queryKey}'s dep1", dep1)
depend_task = [globals()[dependQry](acctDate=self.acctDate,
ssisDate=self.ssisDate,
queryKey=dep1[0],
queryCmd=dep1[1],
runDesc=self.runDesc,
dependQry=dep1[2])]
def requires1(cls):
return depend_task
setattr(klass, "requires", classmethod(requires1))
globals()[queryKey] = klass # make the class available at the module level
tasks.append(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
yield tasks
self.get_target().touch()
==========堆栈跟踪
^{pr2}$
我猜是这样的:
Parameter "task_process_context" with value "None" is not of type string.
显示某些任务没有检索到预期的输出。
在这种情况下,luigi认为任务没有完成。在
确保所有任务都返回定义的类型(在下面的任务中)作为输入。
在您的例子中,
None
正在中断流水线执行,请确保这些任务返回str
,以防失败/没有要返回的数据,而不是None
。在使用emtpy字符串
""
或用于保存空依赖项的关键字字符串:"empty"
我觉得你让事情变得更复杂了。首先,当任务的完整列表直到运行时才知道时,动态依赖项是有用的。例如,您可能需要运行一个任务来查询数据库,而对于查询返回的每一行,您都需要一个新的依赖关系。在
这与以编程方式创建一组任务及其依赖项明显不同,这就是您在示例中所做的感觉。在
下面的玩具代码展示了如何实现您要做的事情:
产生的输出
^{pr2}$与代码的主要结构差异在于,my
task_list
是在任务类之外定义的。有可能您简化了SO,并且您的cmdList
实际上是另一个任务的输出,不能在类之外定义。您可以通过将列表添加到globals()
来解决这个问题,或者您可以将完整的命令列表作为一个参数传递给MyTask
/BDX_Task
,以便在MyTask.requires()
中引用它(如果列表可能很大,可能不是最好的主意)。而且,正如您最初所做的那样,您不能像我的示例那样使用luigi.WrapperTask
。在相关问题 更多 >
编程相关推荐