回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<p>我尝试动态创建Luigi任务(基于cmdList中的属性),并使前一个任务成为下一个任务的依赖项。SQLTask是路易吉。任务. 但是,当我运行这段代码时,我得到<code>This progress looks :| because there were tasks that were not granted run permission by the scheduler</code>
我错过了什么?在</p>
<pre><code>class BDX_Task(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter(default=None)
queryKey = luigi.Parameter()
queryCmd = luigi.Parameter()
runDesc = luigi.Parameter()
dependQry = luigi.Parameter()
def run(self):
print(subprocess.call(self.queryCmd, shell=True))
self.get_target().touch()
def dep_s_dep(cmdList, dep1):
"""
This returns dependency task's dependency
"""
dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
return dep2[0]
class BDX_Query_0XX(SQLTask):
acctDate = luigi.Parameter()
ssisDate = luigi.Parameter()
runDesc = luigi.Parameter()
depend_task = ""
def run(self):
YY = self.acctDate[:4]
MM = self.acctDate[4:6]
acctDate = self.acctDate
ssisDate = self.ssisDate
runDesc = self.runDesc
bdx_sql = r'r:\\1.SQL\\BDX_SQL\\'
cmdList = [
('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
]
tasks = []
for queryKey, queryCmd, dependQry in cmdList:
class_name = queryKey
klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})
if dependQry != '':
dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
print(f"{queryKey}'s dep1", dep1)
depend_task = [globals()[dependQry](acctDate=self.acctDate,
ssisDate=self.ssisDate,
queryKey=dep1[0],
queryCmd=dep1[1],
runDesc=self.runDesc,
dependQry=dep1[2])]
def requires1(cls):
return depend_task
setattr(klass, "requires", classmethod(requires1))
globals()[queryKey] = klass # make the class available at the module level
tasks.<a href="https://www.cnpython.com/list/append" class="inner-link">append</a>(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
yield tasks
self.get_target().touch()
</code></pre>
<p>==========堆栈跟踪</p>
^{pr2}$