我在$AIRFLOW_HOME/dags
工作。我创建了以下文件:
- common
|- __init__.py # empty
|- common.py # common code
- foo_v1.py # dag instanciation
在common.py
中:
在foo_v1.py
中:
from common.common import create_dag
create_dag('foo', 'v1')
使用python测试脚本时,它看起来不错:
$ python foo_v1.py
[2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0
然后在本地启动web服务器和调度程序。我的问题是我没有看到任何id为foo_v1
的DAG。没有创建pyc
文件。做错了什么?为什么不执行foo_v1.py
中的代码?在
您需要将dag分配给模块中导出的变量。如果dag不在模块中
__dict__
气流的DagBag处理器将不会拾取它。在请在此处查看源代码:https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428
要通过flow找到,
create_dag()
返回的DAG对象必须位于foo_v1.py
模块的全局命名空间中。在全局命名空间中放置DAG的一种方法是将其指定给模块级变量:另一种方法是使用^{} 更新全局命名空间:
^{pr2}$后者看起来可能是一种过度杀戮,但它对creating multiple DAGs dynamically很有用。例如,在For循环中:
注意:任何放在} 或packaged DAGs。在
$AIRFLOW_HOME/dags
中的*.py
文件(即使在子目录中,例如在您的例子中是common
)都将被气流解析。如果您不想这样做,可以使用^{相关问题 更多 >
编程相关推荐