我有一个像下面这样的dag
twomins = DAG(
'upsert_every_2mins',
default_args=default_args,
description='Every 2mins',
schedule_interval=None,
catchup=False,
max_active_runs=1,
doc_md = docs
)
然后我尝试通过python变量引用这个DAG名称
dagname = twomins
我的任务如下所示
max_ts = PythonOperator(
task_id="get_maxts",
python_callable=get_max_ts,
provide_context=True,
dag=dagname
)
但它的错误是这样的
Traceback (most recent call last):
File "upsert_every_2and10mins.py", line 148, in <module>
dag=sync_interval
File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 72, in wrapper
dag_args = copy(dag.default_args) or {}
AttributeError: 'str' object has no attribute 'default_args'
我是不是遗漏了什么
我使用它的原因是,一些ETL管道将决定应该选择哪个计划。如果配置有2min,那么dagtwomins
将用于该任务
我认为你错过了气流和DAG工作原理的一个关键点。您拥有的文件只是一个DAG配置文件,它实际上并没有执行某种意义上的代码。因此,我认为您不能通过这样的条件检查来决定文件中的DAG
从tutorial开始:
以及
如果检查依赖于间隔,那么为什么不更改
schedule_interval
相关问题 更多 >
编程相关推荐