我正在编写一个dag,它将从数据库中读取一组配置,然后使用bash操作符执行一系列Python脚本。以前读取的配置将作为参数传递
问题是我没有找到一种有效的方法与其他下游操作员共享配置。我设计了下面的dag。以下是我的担忧
我不确定将进行多少次DB调用来获取jinja模板中所需的值(在下面的示例中)
此外,由于配置在每个任务中都是相同的,我不确定每次从数据库中获取它是否是一个好主意这就是我不想同时使用xcom的原因。我使用了airflow变量,因为JSON解析可以在一行中进行。但是,我想数据库调用问题仍然存在
class ReturningMySqlOperator(MySqlOperator):
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
s = hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
s = s.set_index('laptopName', drop=False)
print(s)
s = s.to_json(orient='index')
Variable.set('jobconfig', s)
t1 = ReturningMySqlOperator(
task_id='mysql_query',
sql='SELECT * FROM laptops',
mysql_conn_id='mysql_db_temp',
dag=dag)
t3 = BashOperator(
task_id='sequence_one',
bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t4 = BashOperator(
task_id='sequence_two',
bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t5 = BashOperator(
task_id='sequence_three',
bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t6 = BashOperator(
task_id='sequence_four',
bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t1 >> t3
t3 >> [t4,t6]
第一点:
在您提供的示例中,您在每个
sequence_x
任务中建立两个到元数据数据库的连接,每个{{var.json.jobconfig.xx}}
调用一个连接。好消息是,调度程序不会执行这些操作,因此不会在每个心跳间隔都执行这些操作。从Astronomer guide:第二点:
我认为这里的关键方面是,您希望向下游传递的值总是相同的,并且在执行
T1
后不会改变。 这里可能有几种方法,但是如果您想尽量减少对DB的调用次数,并且完全避免XComs
,那么应该使用TriggerDagRunOperator
为此,您必须将DAG分为两部分,让控制器DAG执行从MySQL获取数据的任务,触发第二个DAG,使用从控制器DAG获得的值执行所有^{。您可以使用
conf
参数传入数据下面是一个基于官方Airflow example DAGs的示例:
控制器DAG:
执行
trigger
任务时,将包含键message
,作为第二个DAG的DAG运行配置的一部分目标DAG:
本例中
bash_task_1
的日志将包括:概述:
Variable
conf
中的Variable
传递数据dag_run.conf
的数据这样,当触发第二个DAG时,您只能从metadaba DB读取一次
另外,为了避免在
BashOperator
任务定义期间重复太多代码,可以执行以下操作:让我知道这是否对你有效
相关问题 更多 >
编程相关推荐