功能中的气流DAG?

2024-09-30 10:33:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我在$AIRFLOW_HOME/dags工作。我创建了以下文件:

- common
  |- __init__.py   # empty
  |- common.py     # common code
- foo_v1.py        # dag instanciation

common.py中:

^{pr2}$

foo_v1.py中:

 from common.common import create_dag

 create_dag('foo', 'v1')

使用python测试脚本时,它看起来不错:

 $ python foo_v1.py
 [2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
 creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0

然后在本地启动web服务器和调度程序。我的问题是我没有看到任何id为foo_v1的DAG。没有创建pyc文件。做错了什么?为什么不执行foo_v1.py中的代码?在


Tags: 文件pyhomefooinitcreatecodecommon
2条回答

您需要将dag分配给模块中导出的变量。如果dag不在模块中__dict__气流的DagBag处理器将不会拾取它。在

请在此处查看源代码:https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428

要通过flow找到,create_dag()返回的DAG对象必须位于foo_v1.py模块的全局命名空间中。在全局命名空间中放置DAG的一种方法是将其指定给模块级变量:

from common.common import create_dag

dag = create_dag('foo', 'v1')

另一种方法是使用^{}更新全局命名空间:

^{pr2}$

后者看起来可能是一种过度杀戮,但它对creating multiple DAGs dynamically很有用。例如,在For循环中:

for i in range(10):
    globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')

注意:任何放在$AIRFLOW_HOME/dags中的*.py文件(即使在子目录中,例如在您的例子中是common)都将被气流解析。如果您不想这样做,可以使用^{}packaged DAGs。在

相关问题 更多 >

    热门问题