<p>我觉得你让事情变得更复杂了。首先,当任务的完整列表直到运行时才知道时,动态依赖项是有用的。例如,您可能需要运行一个任务来查询数据库,而对于查询返回的每一行,您都需要一个新的依赖关系。在</p>
<p>这与以编程方式创建一组任务及其依赖项明显不同,这就是您在示例中所做的感觉。在</p>
<p>下面的玩具代码展示了如何实现您要做的事情:</p>
<pre><code>import luigi
import datetime
import logging
logger = logging.getLogger('luigi-interface')
task_list = {
'taskA': ['taskA_command', ''],
'taskB': ['taskB command', 'taskA'],
'taskC': ['taskC command', 'taskA'],
'taskD': ['taskD command', 'taskB'],
}
// Equivalent of your BDX_Task class
class MyTask(luigi.Task):
task_date = luigi.DateParameter()
task_name = luigi.Parameter()
task_command = luigi.Parameter()
dependent_task_name = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)
logger.debug('MyTask.__init__ called for task_name="{}"'.format(self.task_name))
def output(self):
filename = 'output_files/{date:%Y%m%d}/{name}.output'.format(date=self.task_date,name=self.task_name)
return luigi.LocalTarget(filename)
def requires(self):
if self.dependent_task_name != '' and self.dependent_task_name in task_list:
dependent_task_command, next_dependent_task_name = task_list[self.dependent_task_name]
return [self.__class__(
task_date=self.task_date,
task_name=self.dependent_task_name,
task_command=dependent_task_command,
dependent_task_name=next_dependent_task_name,
)]
else:
return []
def run(self):
with self.output().open('w') as handle:
handle.write('Command to run: "{cmd}"'.format(cmd=self.task_command))
// Equivalent of your BDX_Query_0XX class
class myWrapperTask(luigi.WrapperTask):
task_date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
for task_name, (task_command, dep_name) in task_list.items():
yield MyTask(
task_date=self.task_date,
task_name=task_name,
task_command=task_command,
dependent_task_name=dep_name,
)
</code></pre>
<p>产生的输出</p>
^{pr2}$
<p>与代码的主要结构差异在于,my <code>task_list</code>是在任务类之外定义的。有可能您简化了SO,并且您的<code>cmdList</code>实际上是另一个任务的输出,不能在类之外定义。您可以通过将列表添加到<code>globals()</code>来解决这个问题,或者您可以将完整的命令列表作为一个参数传递给<code>MyTask</code>/<code>BDX_Task</code>,以便在<code>MyTask.requires()</code>中引用它(如果列表可能很大,可能不是最好的主意)。而且,正如您最初所做的那样,您不能像我的示例那样使用<code>luigi.WrapperTask</code>。在</p>