有没有办法将任务的返回值存储在Python变量中并与下游任务共享(不使用xcom或AIFLOW变量)

2024-06-17 18:57:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个dag,它将从数据库中读取一组配置,然后使用bash操作符执行一系列Python脚本。以前读取的配置将作为参数传递

问题是我没有找到一种有效的方法与其他下游操作员共享配置。我设计了下面的dag。以下是我的担忧

  1. 我不确定将进行多少次DB调用来获取jinja模板中所需的值(在下面的示例中)

  2. 此外,由于配置在每个任务中都是相同的,我不确定每次从数据库中获取它是否是一个好主意这就是我不想同时使用xcom的原因。我使用了airflow变量,因为JSON解析可以在一行中进行。但是,我想数据库调用问题仍然存在

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        s = hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        s = s.set_index('laptopName', drop=False)
        print(s)
        s = s.to_json(orient='index')
        Variable.set('jobconfig', s)



t1 = ReturningMySqlOperator(
    task_id='mysql_query',
    sql='SELECT * FROM laptops',
    mysql_conn_id='mysql_db_temp',
    dag=dag)



t3 = BashOperator(
    task_id='sequence_one',
    bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t4 = BashOperator(
    task_id='sequence_two',
    bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t5 = BashOperator(
    task_id='sequence_three',
    bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t6 = BashOperator(
    task_id='sequence_four',
    bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t1 >> t3 
t3 >> [t4,t6]

Tags: selfbashidjsontaskvarmysqlcommand
1条回答
网友
1楼 · 发布于 2024-06-17 18:57:38

第一点:

I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).

在您提供的示例中,您在每个sequence_x任务中建立两个到元数据数据库的连接,每个{{var.json.jobconfig.xx}}调用一个连接。好消息是,调度程序不会执行这些操作,因此不会在每个心跳间隔都执行这些操作。从Astronomer guide

Since all top-level code in DAG files is interpreted every scheduler "heartbeat," macros and templating allow run-time tasks to be offloaded to the executor instead of the scheduler.

第二点:

我认为这里的关键方面是,您希望向下游传递的值总是相同的,并且在执行T1后不会改变。 这里可能有几种方法,但是如果您想尽量减少对DB的调用次数,并且完全避免XComs,那么应该使用TriggerDagRunOperator

为此,您必须将DAG分为两部分,让控制器DAG执行从MySQL获取数据的任务,触发第二个DAG,使用从控制器DAG获得的值执行所有^{。您可以使用conf参数传入数据

下面是一个基于官方Airflow example DAGs的示例:

控制器DAG:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def _data_from_mysql():
    # fetch data from the DB or anywhere else
    # set a Variable
    data = {'legion': {'company': 'some_company', 'laptop': 'great_laptop'}}
    Variable.set('jobconfig', data, serialize_json=True)


dag = DAG(
    dag_id="example_trigger_controller_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval="@once",
    tags=['example'],
)

get_data_from_MySql = PythonOperator(
    task_id='get_data_from_MySql',
    python_callable=_data_from_mysql,
)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    # Ensure this equals the dag_id of the DAG to trigger
    trigger_dag_id="example_trigger_target_dag",
    conf={"message": "Company is {{var.json.jobconfig.legion.company}}"},
    execution_date='{{ds}}',
    dag=dag,
)
get_data_from_MySql >> trigger

执行trigger任务时,将包含键message,作为第二个DAGDAG运行配置的一部分

目标DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id="example_trigger_target_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
)


def run_this_func(**context):
    """
    Print the payload "message" passed to the DagRun conf attribute.

    :param context: The execution context
    :type context: dict
    """
    print("Remotely received value of {} for key=message".format(
        context["dag_run"].conf["message"]))


run_this = PythonOperator(
    task_id="run_this", python_callable=run_this_func, dag=dag)

bash_task_1 = BashOperator(
    task_id="bash_task_1",
    bash_command='echo "Here is the message: $message"',
    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
    dag=dag
)

本例中bash_task_1的日志将包括:

[2021-05-05 15:40:35,410] {bash.py:158} INFO - Running command: echo "Here is the message: $message"
[2021-05-05 15:40:35,418] {bash.py:169} INFO - Output:
[2021-05-05 15:40:35,419] {bash.py:173} INFO - Here is the message: Company is some_company
[2021-05-05 15:40:35,420] {bash.py:177} INFO - Command exited with return code 0

概述:

  • 一个任务是从数据库中获取数据并将其设置为Variable
  • 触发第二个DAGconf中的Variable传递数据
  • 在您的目标DAG中,使用来自dag_run.conf的数据

这样,当触发第二个DAG时,您只能从metadaba DB读取一次

另外,为了避免在BashOperator任务定义期间重复太多代码,可以执行以下操作:

templated_bash_cmd = """
python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
"""

bash_task_1 = BashOperator(
    task_id="bash_task_1",
    bash_command=templated_bash_cmd,
    params={
        'path_to_script': 'path/sequence1.py'
    },
    dag=dag
)

让我知道这是否对你有效

相关问题 更多 >