googlecloud组件中气流DAG导入错误的获取

2024-10-01 11:35:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我在google composer(flow)中有一个DAG导入:

从气流调节传感器.gcs_传感器导入GoogleCloudStorageObjectSensor

当我运行DAG时,我遇到了一个错误:

“ImportError:没有命名的模块传感器底座传感器操作员““

基本上,我想在做其他事情之前检查一个文件是否存在于bucket中。在

下面是完整的python代码:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import 
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import 
GoogleCloudStorageObjectSensor


CONNECTION_ID = 'something'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}

def print_hello():
return 'youtube folder exists!!'

with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2', 
schedule_interval=timedelta(days=1), 
 default_args=default_args ) as dag: 

 gcp_sensorBucket=GoogleCloudStorageObjectSensor(
 task_id='gcp_sensorbucket',
 bucket='/aa_youtube_new/2018/06/04/', 
 #bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
 object='*.csv', 
 google_cloud_conn_id=CONNECTION_ID

)
hello_operator = PythonOperator(task_id='hello_task', 
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)

Tags: fromimportdefaulthellodatetimebucketargs传感器
2条回答

我看到import from base_sensor_操作符被添加到version 1.10 of Airflow中。这在版本1.81.9中不存在。相反,导入操作如下:

from airflow.operators.sensors import BaseSensorOperator

所以,检查传感器,相关的sensor for Airflow 1.10在传感器下面,但是对于气流1.9和1.8,传感器在operators下。在

所以,这个问题似乎与Composer和flow的版本控制有关,但是flow1.10在8月份不适用于Composer。实际上,这个问题是在Google's Issue Tracker上报告的,其响应是通过使用apacheflow版本1.10来解决这个问题,该版本可以包含在version 1.3 of Composer上。在

你用的是哪个作曲家版本?我将您的DAG代码复制并粘贴到一个新创建的Composer环境中composer-1.1.0-airflow-1.9.0,这对我来说是开箱即用的。在

以下是截图: enter image description here

相关问题 更多 >