我们迁移到了puckel/flow-1.10.2,试图解决在多个环境中出现的性能低下的问题。我们使用的是AWS ECS上的ECS Airflow 1.10.2。有趣的是,CPU/mem从未超过80%。气流元数据库的利用率也很低。在
下面我列出了我们使用的配置,DagBag解析时间加上在纯Python中运行DagBag()
的cProfile
输出的详细执行时间。在
我们的一些DAG从create_subdag_functions.py
导入一个函数,该函数返回我们在12个DAG中使用的DAG。大多数这些DAG和它们对应的子DAG只在小时内运行,但是每10分钟就有1个DAG/3个子DAG运行一次。在
max_threads = 2
dag_dir_list_interval = 300
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor
一些观察结果:
airflow list_dags -r
也需要很长的时间,即使示例dag被禁用,也会运行它们。为每个DAG解析的时间将跳来跳去。在DagBag()
函数时,我们发现DagBag()大部分时间都花在airflow.utils.dag_processing.list_py_paths
函数中,这可能是因为/usr/local/flow/dags文件夹中有50多个sql文件我尝试过的解决方案:
max_threads
min_file_process_interval
-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file | duration | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py | 74.819988 | 1 | 21 | ['dag1']
/dag3.py | 53.193430000000006 | 1 | 17 | ['dag3']
/dag8.py | 34.535742 | 5 | 40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py | 21.543944000000003 | 6 | 38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py | 18.458316000000003 | 3 | 16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py | 14.652806000000002 | 0 | 0 | []
/dag7.py | 13.051984000000001 | 2 | 8 | ['dag11', 'dag11.subdag1']
/dag8.py | 10.02703 | 1 | 21 | ['dag5']
/dag9.py | 9.834226000000001 | 1 | 1 | ['dag10']
/dag10.py | 9.575258000000002 | 1 | 28 | ['dag2']
/dag11.py | 9.418897999999999 | 1 | 9 | ['dag6']
/dag12.py | 9.319210000000002 | 1 | 6 | ['dag12']
/dag13.py | 8.686964 | 1 | 26 | ['dag7']
注意:为了简洁起见,从第二个输出中删除了示例dag
from airflow.models import DagBag; DagBag()
的cProfile输出:
{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from
ncalls tottime percall cumtime percall filename:lineno(function)
997 443.441 0.445 443.441 0.445 {built-in method io.open}
198 186.978 0.944 483.629 2.443 zipfile.py:198(is_zipfile)
642 65.069 0.101 65.069 0.101 {method 'close' of '_io.BufferedReader' objects}
1351 45.924 0.034 45.946 0.034 <frozen importlib._bootstrap_external>:830(get_data)
7916 39.403 0.005 39.403 0.005 {built-in method posix.stat}
2/1 22.927 11.464 544.419 544.419 dag_processing.py:220(list_py_file_paths)
33 18.992 0.576 289.797 8.782 models.py:321(process_file)
22 8.723 0.397 8.723 0.397 {built-in method posix.scandir}
412 2.379 0.006 2.379 0.006 {built-in method posix.listdir}
9 1.301 0.145 3.058 0.340 linecache.py:82(updatecache)
1682/355 0.186 0.000 0.731 0.002 sre_parse.py:470(_parse)
1255 0.183 0.000 0.183 0.000 {built-in method marshal.loads}
3092/325 0.143 0.000 0.647 0.002 sre_compile.py:64(_compile)
59 0.139 0.002 0.139 0.002 {built-in method builtins.compile}
25270 0.134 0.000 0.210 0.000 sre_parse.py:253(get)
52266 0.132 0.000 0.132 0.000 {method 'append' of 'list' objects}
4210/4145 0.131 0.000 1.760 0.000 {built-in method builtins.__build_class__}
气流性能下降:
目前没有回答
相关问题 更多 >
编程相关推荐