运行时配置为PythonOp

2024-09-30 08:30:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我找不到任何关于如何使用通过“-c”传递的JSON变量的工作文档,例如回填作业。在

我一直在打印我的python任务**kwargs以找出答案,但我仍然无法确定它。提供_context=True

谁能给我指出正确的方向吗?在

所以,我想做的是:

airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12

我有一只Python:

^{pr2}$

在run_task中,我想访问override变量:

def run_task(*args, **kwargs): 

    dag_run = kwargs.get('dag_run')
    logging.info(kwargs['dag_run'].conf.get('override'))

但是我找不到访问这个覆盖变量的方法

[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
    logging.info(kwargs['dag_run'].conf.get('override'))

编辑:我确实找到了一个配置设置和描述,似乎表明这些参数需要在代码中设置

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True

donot_pickle参数设置为False。在


Tags: runinpyhometaskexecutegetconf
2条回答

您可以使用-c '{"key":"value"}'从CLI传递参数,然后在Python可调用函数中将其用作"dag_run.conf["key"]"

就你而言

操作员

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    provide_context=True,
    dag = this_dag
)

可调用函数

^{pr2}$

你需要做两件事。在

  1. kwargs['run_dag'].conf.get('override')替换为kwargs['dag_run'].conf.get('override')。在
  2. 同时更改签名 从def run_task(*args, **kwargs):到{}

相关问题 更多 >

    热门问题