我对使用Apache Airflow是新手。 我的dag的某些操作员处于失败状态。我正在努力理解错误的根源
以下是问题的详细信息:
我的dag相当大,它的某些部分由子dag组成。
我在Composer UI中注意到的是,失败的子DAG都是在名为download_file
的task_id
中执行的,该XCom
与GoogleCloudStorageDownloadOperator
一起使用
>> GoogleCloudStorageDownloadOperator(
task_id='download_file',
bucket="sftp_sef",
object="{{task_instance.xcom_pull(task_ids='find_file') | first }}",
filename="/home/airflow/gcs/data/zips/{{{{ds_nodash}}}}_{0}.zip".format(table)
)
上述子数据集中的日志没有显示任何有用的内容
日志:
[2020-04-07 15:19:25,618] {models.py:1359} INFO - Dependencies all met for [2020-04-07 15:19:25,660] {models.py:1359} INFO - Dependencies all met for [2020-04-07 15:19:25,660] {models.py:1577} INFO -
------------------------------------------------------------------------------- Starting attempt 10 of 1
[2020-04-07 15:19:25,685] {models.py:1599} INFO - Executing on 2020-04-06T11:44:31+00:00 [2020-04-07 15:19:25,685] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run datamart_integration.consentement_email download_file 2020-04-06T11:44:31+00:00 --job_id 156313 --pool integration --raw -sd DAGS_FOLDER/datamart/datamart_integration.py --cfg_path /tmp/tmpacazgnve']
我不确定是否有什么地方我没有检查。。。以下是我的问题:
关于你的三个问题:
首先使用Cloud Composer时,有几种方法可以调试代码中的错误。根据documentation,你应该:
这些日志与单个DAG任务相关。可以在云存储的日志文件夹和Web界面中查看它们
创建Cloud Composer环境时,还将创建一个云存储桶并与之关联。因此,Cloud Composer将单个DAG任务的日志存储在此bucket内的logs文件夹中,每个工作流文件夹都有一个用于其DAG和子DAG的文件夹。您可以检查它的结构here
关于Airflow web界面,它每60秒刷新一次。此外,您可以查看更多有关它的信息here
您可以将Cloud Monitoring和Cloud Logging与Cloud Composer一起使用。云监控提供了云驱动应用程序的性能和整体健康状况的可视性,而云日志显示了调度器和工作容器生成的日志。因此,根据您的需要,您可以同时使用两者或仅使用您认为更有用的一种
因此,在对DAG进行故障排除时,建议执行以下步骤
其次关于测试和调试,建议您将生产环境和测试环境分开,以避免DAG干扰
此外,可以在本地测试DAG,文档中有一个关于此主题的教程here。本地测试允许您识别语法和任务错误。但是,我必须指出,不可能检查/评估依赖关系和与数据库的通信
第三个,一般来说,为了验证Xcom中的错误,您应该检查:
我想指出的是,根据this documentation,到Google CloudStorageDownloadOperator的路径已更新为GCSToLocalOperator
此外,我还鼓励您查看以下内容:code和documentation以检查Xcom语法和错误
如果需要进一步帮助,请随时与我共享错误代码
相关问题 更多 >
编程相关推荐