python变量中的气流参考dag名称

2024-09-29 17:17:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个像下面这样的dag

twomins = DAG(
    'upsert_every_2mins',
    default_args=default_args,
    description='Every 2mins',
    schedule_interval=None,
    catchup=False,
    max_active_runs=1,
    doc_md = docs
)

然后我尝试通过python变量引用这个DAG名称

dagname = twomins

我的任务如下所示

max_ts = PythonOperator(
        task_id="get_maxts",
        python_callable=get_max_ts,
        provide_context=True,
        dag=dagname 
    )

但它的错误是这样的

Traceback (most recent call last):
  File "upsert_every_2and10mins.py", line 148, in <module>
    dag=sync_interval
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 72, in wrapper
    dag_args = copy(dag.default_args) or {}
AttributeError: 'str' object has no attribute 'default_args'

我是不是遗漏了什么

我使用它的原因是,一些ETL管道将决定应该选择哪个计划。如果配置有2min,那么dagtwomins将用于该任务


Tags: pydefaultgetlineargsmaxfiledag
1条回答
网友
1楼 · 发布于 2024-09-29 17:17:01

我认为你错过了气流和DAG工作原理的一个关键点。您拥有的文件只是一个DAG配置文件,它实际上并没有执行某种意义上的代码。因此,我认为您不能通过这样的条件检查来决定文件中的DAG

tutorial开始:

One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code.

以及

People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object.

如果检查依赖于间隔,那么为什么不更改schedule_interval

相关问题 更多 >

    热门问题