我有一个气流DAG,看起来有点像这样:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.databricks_operator import \
DatabricksRunNowOperator
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
# These args will get passed on to each operator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"is_paused_upon_creation": True,
"timeout_seconds": 604800,
}
# Primary Control Block
with DAG(
"name",
start_date=datetime(2021, 1, 1),
schedule_interval="@once",
default_args=default_args,
catchup=False,
max_active_runs=1,
) as dag:
# tasks
imp_step_1 = DatabricksRunNowOperator(...)
imp_step_2 = DatabricksRunNowOperator(...)
data_obj_list_1 = ["a", "b", "c", "1", "2", "3"]
data_obj_list_2 = ["d", "e", "f"]
...
...
...
data_obj_list_n = ["x", "y", "z"]
def generate_tasks(job):
return DatabricksRunNowOperator(...)
dummy_step_1 = DummyOperator(
task_id="step_1",
trigger_rule="all_success",
)
dummy_step_2 = DummyOperator(
task_id="step_2",
trigger_rule="all_success",
)
dummy_step_3 = DummyOperator(
task_id="step_3",
trigger_rule="all_success",
)
...
...
dummy_step_n = DummyOperator(
task_id="step_n",
trigger_rule="all_success",
)
imp_step_1 >> imp_step_2 >> dummy_step_1
for obj in data_obj_list_1:
dummy_step_1 >> generate_tasks(obj) >> dummy_step_2
for obj in data_obj_list_2:
dummy_step_2 >> generate_tasks(obj) >> dummy_step_3
...
...
for obj in data_obj_list_n:
dummy_step_n >> generate_tasks(obj) >> dummy_step_(n+1)
基本上,其思想是对前两个步骤有一个明确的依赖关系,然后动态生成任务,每个数组中的所有任务都在一起,并由虚拟操作符任务链接
DAG工作得很好,除了第一个列表之外,后面的所有列表都会被分解,因为运行有限数量的并发任务更有效(考虑到这些列表中的项相对于第一个列表中的项的计算需求)。因此,我有一个聪明的想法,使用一个列表,动态地将其分解为3个多个列表(因此,即使我们以后添加更多对象,它也会自动处理),然后使用这些子列表动态生成任务,并动态生成虚拟操作符将它们链接起来。基本上,重新创建DAG,但几乎是完全动态的。以下是我取得的成绩:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.databricks_operator import \
DatabricksRunNowOperator
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
# These args will get passed on to each operator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"is_paused_upon_creation": True,
"timeout_seconds": 604800,
}
# Primary Control Block
with DAG(
"name",
start_date=datetime(2021, 1, 1),
schedule_interval="@once",
default_args=default_args,
catchup=False,
max_active_runs=1,
) as dag:
# tasks
imp_step_1 = DatabricksRunNowOperator(...)
imp_step_2 = DatabricksRunNowOperator(...)
data_obj_list_1 = ["a", "b", "c", "1", "2", "3"]
data_objs = ["d", "e", "f" .... , "x", "y", "z"]
data_objs = [data_objs[i : i + 3] for i in range(0, len(data_objs), 3)]
def generate_tasks(job):
return DatabricksRunNowOperator(...)
def generate_dummies(job):
return DummyOperator(...)
dummy_step_1 = DummyOperator(
task_id="step_1",
trigger_rule="all_success",
)
dummy_step_2 = DummyOperator(
task_id="step_2",
trigger_rule="all_success",
)
imp_step_1 >> imp_step_2 >> dummy_step_1
for obj in data_obj_list_1:
dummy_step_1 >> generate_tasks(obj) >> dummy_step_2
for i in range(len(data_objs)):
for k in data_objs[i]:
generate_dummies(i+2) >> generate_tasks(k) >> generate_dummies(i+3)
但是气流抛出了一个错误并说:
airflow.exceptions.DuplicateTaskIdFound: Task id 'step_2' has already been added to the DAG
在这种情况下,步骤2是指虚拟操作员步骤。我如何才能让第二个版本像第一个版本一样工作,避免提前生成一大堆操作符,如果列表中添加的项目越来越少或越来越多,如何使其工作,这样我就不必去更新正在创建的操作符的数量
您知道范围从0开始,这意味着您尝试生成另一个名为dummy_step_2的dummy运算符,这就是您的函数所做的,对吗?也许可以摆脱:
仅从该循环生成所有内容
`对于范围内的i(len(data_objs)):
但是它不起作用,因为generate_dumies()试图生成具有相同任务ID的伪操作符,只要在纸上写下您试图生成的任务ID,您就会看到问题
相关问题 更多 >
编程相关推荐