如何在气流中动态生成和使用虚拟操作符

2024-09-30 10:38:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个气流DAG,看起来有点像这样:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.databricks_operator import \
    DatabricksRunNowOperator
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator

# These args will get passed on to each operator
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(seconds=30),
    "is_paused_upon_creation": True,
    "timeout_seconds": 604800,
}

# Primary Control Block
with DAG(
    "name",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@once",
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
) as dag:

    # tasks
    imp_step_1 = DatabricksRunNowOperator(...)

    imp_step_2 = DatabricksRunNowOperator(...)

    data_obj_list_1 = ["a", "b", "c", "1", "2", "3"]

    data_obj_list_2 = ["d", "e", "f"]

    ...
    ...
    ...

    data_obj_list_n = ["x", "y", "z"]
    
    def generate_tasks(job):
        return DatabricksRunNowOperator(...)

    dummy_step_1 = DummyOperator(
        task_id="step_1",
        trigger_rule="all_success",
    )

    dummy_step_2 = DummyOperator(
        task_id="step_2",
        trigger_rule="all_success",
    )

    dummy_step_3 = DummyOperator(
        task_id="step_3",
        trigger_rule="all_success",
    )
    ...
    ...

    dummy_step_n = DummyOperator(
        task_id="step_n",
        trigger_rule="all_success",
    )


    imp_step_1 >> imp_step_2 >> dummy_step_1

    for obj in data_obj_list_1:
        dummy_step_1 >> generate_tasks(obj) >> dummy_step_2

    for obj in data_obj_list_2:
        dummy_step_2 >> generate_tasks(obj) >> dummy_step_3

    ...
    ...

    for obj in data_obj_list_n:
        dummy_step_n >> generate_tasks(obj) >> dummy_step_(n+1)

基本上,其思想是对前两个步骤有一个明确的依赖关系,然后动态生成任务,每个数组中的所有任务都在一起,并由虚拟操作符任务链接

DAG工作得很好,除了第一个列表之外,后面的所有列表都会被分解,因为运行有限数量的并发任务更有效(考虑到这些列表中的项相对于第一个列表中的项的计算需求)。因此,我有一个聪明的想法,使用一个列表,动态地将其分解为3个多个列表(因此,即使我们以后添加更多对象,它也会自动处理),然后使用这些子列表动态生成任务,并动态生成虚拟操作符将它们链接起来。基本上,重新创建DAG,但几乎是完全动态的。以下是我取得的成绩:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.databricks_operator import \
    DatabricksRunNowOperator
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator

# These args will get passed on to each operator
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(seconds=30),
    "is_paused_upon_creation": True,
    "timeout_seconds": 604800,
}

# Primary Control Block
with DAG(
    "name",
    start_date=datetime(2021, 1, 1),
    schedule_interval="@once",
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
) as dag:

    # tasks
    imp_step_1 = DatabricksRunNowOperator(...)

    imp_step_2 = DatabricksRunNowOperator(...)

    data_obj_list_1 = ["a", "b", "c", "1", "2", "3"]

    data_objs = ["d", "e", "f" .... , "x", "y", "z"]

    data_objs = [data_objs[i : i + 3] for i in range(0, len(data_objs), 3)]

    def generate_tasks(job):
        return DatabricksRunNowOperator(...)


    def generate_dummies(job):
        return DummyOperator(...)

    dummy_step_1 = DummyOperator(
        task_id="step_1",
        trigger_rule="all_success",
    )

    dummy_step_2 = DummyOperator(
        task_id="step_2",
        trigger_rule="all_success",
    )


    imp_step_1 >> imp_step_2 >> dummy_step_1

    for obj in data_obj_list_1:
        dummy_step_1 >> generate_tasks(obj) >> dummy_step_2

    for i in range(len(data_objs)):
        for k in data_objs[i]:
            generate_dummies(i+2) >> generate_tasks(k) >> generate_dummies(i+3)

但是气流抛出了一个错误并说:

airflow.exceptions.DuplicateTaskIdFound: Task id 'step_2' has already been added to the DAG

在这种情况下,步骤2是指虚拟操作员步骤。我如何才能让第二个版本像第一个版本一样工作,避免提前生成一大堆操作符,如果列表中添加的项目越来越少或越来越多,如何使其工作,这样我就不必去更新正在创建的操作符的数量


Tags: fromimportfalseobjdataonstepargs
1条回答
网友
1楼 · 发布于 2024-09-30 10:38:03

您知道范围从0开始,这意味着您尝试生成另一个名为dummy_step_2的dummy运算符,这就是您的函数所做的,对吗?也许可以摆脱:

    for obj in data_obj_list_1:
        dummy_step_1 >> generate_tasks(obj) >> dummy_step_2

仅从该循环生成所有内容

`对于范围内的i(len(data_objs)):

    for k in data_objs[i]:

        generate_dummies(i) >> generate_tasks(k) >> 
generate_dummies(i+1)`

但是它不起作用,因为generate_dumies()试图生成具有相同任务ID的伪操作符,只要在纸上写下您试图生成的任务ID,您就会看到问题

相关问题 更多 >

    热门问题