ECS气流1.10.2性能问题。操作员和任务需要10倍的时间

2024-09-27 00:15:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我们迁移到了puckel/flow-1.10.2,试图解决在多个环境中出现的性能低下的问题。我们使用的是AWS ECS上的ECS Airflow 1.10.2。有趣的是,CPU/mem从未超过80%。气流元数据库的利用率也很低。在

下面我列出了我们使用的配置,DagBag解析时间加上在纯Python中运行DagBag()cProfile输出的详细执行时间。在

我们的一些DAG从create_subdag_functions.py导入一个函数,该函数返回我们在12个DAG中使用的DAG。大多数这些DAG和它们对应的子DAG只在小时内运行,但是每10分钟就有1个DAG/3个子DAG运行一次。在

max_threads = 2
dag_dir_list_interval = 300 
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
executor = CeleryExecutor

一些观察结果:

  • airflow list_dags -r也需要很长的时间,即使示例dag被禁用,也会运行它们。为每个DAG解析的时间将跳来跳去。在
  • 每个DAG的持续时间不一致(但它只适用于我们的DAG,而不适用于示例)
  • 解析时间通常会有一个大的飞跃。e、 g.5个DAG的持续时间为<;1,接下来的4个DAG的持续时间为20+
  • 当我们用cProfile分析DagBag()函数时,我们发现DagBag()大部分时间都花在airflow.utils.dag_processing.list_py_paths函数中,这可能是因为/usr/local/flow/dags文件夹中有50多个sql文件
  • 纵观着陆时间,任务时间在两次特定的飞行之间跳跃了一个数量级。我试过查看日志等,但两次运行之间没有什么值得注意的地方。我把图片贴在底部。该性能损失发生在气流1.10.0

我尝试过的解决方案:

  • 增加/降低max_threads
  • 增加/消除min_file_process_interval
  • 清除并重新加载所有数据库的气流
  • 关闭并重新部署环境

^{pr2}$

-------------------------------------------------------------------
DagBag loading stats for /usr/local/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 42
Total task number: 311
DagBag parsing time: 296.5826819999999
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
file                          | duration           | dag_num | task_num | dags
------------------------------+--------------------+---------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/dag1.py                      | 74.819988          |       1 |       21 | ['dag1']
/dag3.py                      | 53.193430000000006 |       1 |       17 | ['dag3']
/dag8.py                      | 34.535742          |       5 |       40 | ['dag8', 'dag8.subdag1', 'dag8.subdag2', 'dag8.subdag3', 'dag8.subdag4']
/dag4.py                      | 21.543944000000003 |       6 |       38 | ['dag9', 'dag9.subdag1', 'dag9.subdag2', 'dag9.subdag3', 'dag9.subdag4', 'dag9.subdag5']
/dag5.py                      | 18.458316000000003 |       3 |       16 | ['dag4', 'dag4.subdag1', 'dag4.subdag2']
/create_subdag_functions.py   | 14.652806000000002 |       0 |        0 | []
/dag7.py                      | 13.051984000000001 |       2 |        8 | ['dag11', 'dag11.subdag1']
/dag8.py                      | 10.02703           |       1 |       21 | ['dag5']
/dag9.py                      | 9.834226000000001  |       1 |        1 | ['dag10']
/dag10.py                     | 9.575258000000002  |       1 |       28 | ['dag2']
/dag11.py                     | 9.418897999999999  |       1 |        9 | ['dag6']
/dag12.py                     | 9.319210000000002  |       1 |        6 | ['dag12']
/dag13.py                     | 8.686964           |       1 |       26 | ['dag7']

注意:为了简洁起见,从第二个输出中删除了示例dag

from airflow.models import DagBag; DagBag()的cProfile输出:

{{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=6740
{{__init__.py:51}} INFO - Using executor SequentialExecutor
{{models.py:273}} INFO - Filling up the DagBag from 

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      997  443.441    0.445  443.441    0.445 {built-in method io.open}
      198  186.978    0.944  483.629    2.443 zipfile.py:198(is_zipfile)
      642   65.069    0.101   65.069    0.101 {method 'close' of '_io.BufferedReader' objects}
     1351   45.924    0.034   45.946    0.034 <frozen importlib._bootstrap_external>:830(get_data)
     7916   39.403    0.005   39.403    0.005 {built-in method posix.stat}
      2/1   22.927   11.464  544.419  544.419 dag_processing.py:220(list_py_file_paths)
       33   18.992    0.576  289.797    8.782 models.py:321(process_file)
       22    8.723    0.397    8.723    0.397 {built-in method posix.scandir}
      412    2.379    0.006    2.379    0.006 {built-in method posix.listdir}
        9    1.301    0.145    3.058    0.340 linecache.py:82(updatecache)
 1682/355    0.186    0.000    0.731    0.002 sre_parse.py:470(_parse)
     1255    0.183    0.000    0.183    0.000 {built-in method marshal.loads}
 3092/325    0.143    0.000    0.647    0.002 sre_compile.py:64(_compile)
       59    0.139    0.002    0.139    0.002 {built-in method builtins.compile}
    25270    0.134    0.000    0.210    0.000 sre_parse.py:253(get)
    52266    0.132    0.000    0.132    0.000 {method 'append' of 'list' objects}
4210/4145    0.131    0.000    1.760    0.000 {built-in method builtins.__build_class__}

气流性能下降:

Airflow performance drop


Tags: 函数inpy时间methodlistairflowdag

热门问题