我在google composer(flow)中有一个DAG导入:
从气流调节传感器.gcs_传感器导入GoogleCloudStorageObjectSensor
当我运行DAG时,我遇到了一个错误:
“ImportError:没有命名的模块传感器底座传感器操作员““
基本上,我想在做其他事情之前检查一个文件是否存在于bucket中。在
下面是完整的python代码:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import
GoogleCloudStorageObjectSensor
CONNECTION_ID = 'something'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}
def print_hello():
return 'youtube folder exists!!'
with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2',
schedule_interval=timedelta(days=1),
default_args=default_args ) as dag:
gcp_sensorBucket=GoogleCloudStorageObjectSensor(
task_id='gcp_sensorbucket',
bucket='/aa_youtube_new/2018/06/04/',
#bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
object='*.csv',
google_cloud_conn_id=CONNECTION_ID
)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)
我看到import from base_sensor_操作符被添加到version 1.10 of Airflow中。这在版本1.8和1.9中不存在。相反,导入操作如下:
所以,检查传感器,相关的sensor for Airflow 1.10在传感器下面,但是对于气流1.9和1.8,传感器在operators下。在
所以,这个问题似乎与Composer和flow的版本控制有关,但是flow1.10在8月份不适用于Composer。实际上,这个问题是在Google's Issue Tracker上报告的,其响应是通过使用apacheflow版本1.10来解决这个问题,该版本可以包含在version 1.3 of Composer上。在
你用的是哪个作曲家版本?我将您的DAG代码复制并粘贴到一个新创建的Composer环境中composer-1.1.0-airflow-1.9.0,这对我来说是开箱即用的。在
以下是截图:
相关问题 更多 >
编程相关推荐