气流bash操作员收集回流cod

2024-06-01 07:02:36 发布

您现在位置:Python中文网/ 问答频道 /正文

请容忍我,因为我刚刚开始使用airlow,我要做的是收集bashcomperator任务的返回代码并将其保存到一个局部变量中,然后基于该返回代码分支到另一个任务。我的问题是如何让bash操作符返回一些内容。以下是我的代码段:

dag = DAG(dag_id='dag_1',
      default_args=default_args,
      schedule_interval='0 2 * * *',
      user_defined_macros=user_def_macros,
      dagrun_timeout=timedelta(minutes=60)
      )
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)

我正在尝试xcom_的推送,但老实说,我不知道它是如何工作的。。这是收集结果的正确方法吗?在日志中,最后一行是:命令退出,返回代码为0


Tags: 代码bashiddefaulttaskargscommandmacros
2条回答

根据BashOperator doc

If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes

知道了这一点,您只需要让bash脚本最后打印错误代码,所以将以下内容附加到您的bash_command

<your code> ; echo $?

就你而言,它是:

oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)

你能把所有的资料都寄出去吗。我认为你在解释气流是如何工作的方面有问题

从Task1(如果它是bash操作符)可以执行以下操作:

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag)

在任务2中:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)

其中ti是task\u实例变量,而{{}符号用于访问Variables部分

相关问题 更多 >