Luigi“调度程序未授予任务运行权限”

2024-10-01 11:37:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试动态创建Luigi任务(基于cmdList中的属性),并使前一个任务成为下一个任务的依赖项。SQLTask是路易吉。任务. 但是,当我运行这段代码时,我得到This progress looks :| because there were tasks that were not granted run permission by the scheduler 我错过了什么?在

class BDX_Task(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter(default=None)
    queryKey = luigi.Parameter()
    queryCmd = luigi.Parameter()
    runDesc = luigi.Parameter()
    dependQry = luigi.Parameter()

    def run(self):
        print(subprocess.call(self.queryCmd, shell=True))
        self.get_target().touch()

def dep_s_dep(cmdList, dep1):
    """
        This returns dependency task's dependency 
    """
    dep2 = [(key,cmd,dep) for key, cmd, dep in cmdList if key==dep1]
    return dep2[0]

class BDX_Query_0XX(SQLTask):
    acctDate = luigi.Parameter()
    ssisDate = luigi.Parameter()  
    runDesc = luigi.Parameter()
    depend_task = ""

    def run(self):
        YY = self.acctDate[:4]
        MM = self.acctDate[4:6]
        acctDate = self.acctDate
        ssisDate = self.ssisDate
        runDesc = self.runDesc
        bdx_sql = r'r:\\1.SQL\\BDX_SQL\\'
        cmdList = [
                ('BDX010',f'{bdx_sql}BDX_001_NI_DM 010.sql -o output010.txt',None),
                ('BDX020',f'{bdx_sql}BDX_001_NI_DM 020.sql -o output020.txt','BDX010'),
                ('BDX022a',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 010.sql -o output022a.txt','BDX020'),
                ('BDX022b',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 020.sql -o output022b.txt -v Year1={YY} MM={MM}','BDX022a'),
                ('BDX022c',f'{bdx_sql}BDX_022_P038_All_Final_CatAdj 030.sql -o output022c.txt -v Year={YY} Month={MM}', 'BDX022b'),
                ('BDX023',f'{bdx_sql}BDX_023_P031_MTD_All_Final_CatAdj.sql -o output023.txt ','BDX020'),
                ('BDX024',f'{bdx_sql}BDX_024_P031_ITD_All_Final_CatAdj.sql -o output024.txt','BDX020'),
                ('BDX025a',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 010.sql -o output025a.txt','BDX020'),
                ('BDX025b',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 020.sql -o output025b.txt -v Year={YY} Month={MM}','BDX025a'),
                ('BDX025c',f'{bdx_sql}BDX_025_P038_All_Final_CatAdj 030.sql -o output025c.txt -v YYMM={acctDate}','BDX025b')
            ]

        tasks = []
        for queryKey, queryCmd, dependQry in cmdList:
        class_name = queryKey

        klass = type(queryKey, (BDX_Task,),{}) # {'acctDate': self.acctDate, 'queryKey': queryKey, 'queryCmd': queryCmd, 'runDesc': self.runDesc, 'dependQry': dependQry})

        if dependQry != '':
            dep1 = dep_s_dep(cmdList, dependQry) # info about dependency task (key, cmd, dep's dep)
            print(f"{queryKey}'s dep1", dep1)
            depend_task = [globals()[dependQry](acctDate=self.acctDate,
                                                ssisDate=self.ssisDate,
                                                queryKey=dep1[0],
                                                queryCmd=dep1[1],
                                                runDesc=self.runDesc,
                                                dependQry=dep1[2])]

            def requires1(cls):
                return depend_task

            setattr(klass, "requires", classmethod(requires1))

        globals()[queryKey] = klass  # make the class available at the module level

        tasks.append(globals()[queryKey](acctDate=self.acctDate, ssisDate =self.ssisDate, queryKey = queryKey, queryCmd = queryCmd, runDesc = self.runDesc, dependQry = dependQry)) # this addes Task class named after queryKey to dependency
    yield tasks

        self.get_target().touch()

==========堆栈跟踪

^{pr2}$

Tags: selftxtsqlparameterluigidepdep1bdx
2条回答

我猜是这样的:
Parameter "task_process_context" with value "None" is not of type string.

显示某些任务没有检索到预期的输出。
在这种情况下,luigi认为任务没有完成。在

确保所有任务都返回定义的类型(在下面的任务中)作为输入。
在您的例子中,None正在中断流水线执行,请确保这些任务返回str,以防失败/没有要返回的数据,而不是None。在

使用emtpy字符串""或用于保存空依赖项的关键字字符串:"empty"

我觉得你让事情变得更复杂了。首先,当任务的完整列表直到运行时才知道时,动态依赖项是有用的。例如,您可能需要运行一个任务来查询数据库,而对于查询返回的每一行,您都需要一个新的依赖关系。在

这与以编程方式创建一组任务及其依赖项明显不同,这就是您在示例中所做的感觉。在

下面的玩具代码展示了如何实现您要做的事情:

import luigi
import datetime
import logging

logger = logging.getLogger('luigi-interface')

task_list = {
    'taskA': ['taskA_command', ''],
    'taskB': ['taskB command', 'taskA'],
    'taskC': ['taskC command', 'taskA'],
    'taskD': ['taskD command', 'taskB'],
}

// Equivalent of your BDX_Task class
class MyTask(luigi.Task):

    task_date = luigi.DateParameter()
    task_name = luigi.Parameter()
    task_command = luigi.Parameter()
    dependent_task_name = luigi.Parameter()

    def __init__(self, *args, **kwargs):
        super(MyTask, self).__init__(*args, **kwargs)
        logger.debug('MyTask.__init__ called for task_name="{}"'.format(self.task_name))

    def output(self):
        filename = 'output_files/{date:%Y%m%d}/{name}.output'.format(date=self.task_date,name=self.task_name)
        return luigi.LocalTarget(filename)

    def requires(self):
        if self.dependent_task_name != '' and self.dependent_task_name in task_list:
            dependent_task_command, next_dependent_task_name = task_list[self.dependent_task_name]
            return [self.__class__(
                task_date=self.task_date,
                task_name=self.dependent_task_name,
                task_command=dependent_task_command,
                dependent_task_name=next_dependent_task_name,
            )]
        else:
            return []

    def run(self):
        with self.output().open('w') as handle:
            handle.write('Command to run: "{cmd}"'.format(cmd=self.task_command))


// Equivalent of your BDX_Query_0XX class
class myWrapperTask(luigi.WrapperTask):
    task_date = luigi.DateParameter(default=datetime.date.today())

    def requires(self):
        for task_name, (task_command, dep_name) in task_list.items():
            yield MyTask(
                task_date=self.task_date,
                task_name=task_name,
                task_command=task_command,
                dependent_task_name=dep_name,
            )

产生的输出

^{pr2}$

与代码的主要结构差异在于,my task_list是在任务类之外定义的。有可能您简化了SO,并且您的cmdList实际上是另一个任务的输出,不能在类之外定义。您可以通过将列表添加到globals()来解决这个问题,或者您可以将完整的命令列表作为一个参数传递给MyTask/BDX_Task,以便在MyTask.requires()中引用它(如果列表可能很大,可能不是最好的主意)。而且,正如您最初所做的那样,您不能像我的示例那样使用luigi.WrapperTask。在

相关问题 更多 >