损坏的DAG:没有名为'气流分配'

2024-06-16 18:53:05 发布

您现在位置:Python中文网/ 问答频道 /正文

对Airflow/Python等非常陌生,但似乎无法解决此问题需要做什么。。在

空气在冰球/码头上流动

完全错误是:

损坏的DAG:[/usr/local/flow/dags/xxxx.py公司]没有名为'气流调节器.gsc_到\u gcs'

在python代码中,我写了:

来自气流调节器.gcs_to_gcs导入GoogleCloudStorageToGogleCloudStorageOperator

我想我需要安装gcs-to-gcs模块,但我不知道该怎么做。在

如有任何具体指示,我们将不胜感激:—)


Tags: tousrlocal错误flow空气airflow码头
2条回答

我知道这是一个老问题,但我只是试着使用相同的运算符,收到了相同的消息,因为cloudcomposer仍然不支持GoogleCloudStorageToGoogleCloudStorageOperator。在

我用一个简单的BashOperator解决了我所需要的

    from airflow.operators.bash_operator import BashOperator

with models.DAG(
            dag_name,
            schedule_interval=timedelta(days=1),
            default_args=default_dag_args) as dag:

        copy_files = BashOperator(
            task_id='copy_files',
            bash_command='gsutil -m cp <Source Bucket> <Destination Bucket>'
        )

非常简单,可以创建文件夹,如果你需要和重命名你的文件。在

GoogleCloudStorageToGoogleCloudStorageOperator在v1.9.0中不可用,因此您必须从here复制文件,从{a2}复制相关的钩子,并将其粘贴到python环境中相应位置的flow文件夹中。按照以下步骤操作:

运行以下代码以查找Apache Airflow在计算机上的存储位置:

pip show apache-airflow

它将在您的终端上产生以下输出:

^{pr2}$

位置后的路径:是您的Apache flow目录

现在克隆git repo以获取这两个文件:

# Clone the git repo to `airflow-temp` folder
git clone https://github.com/apache/incubator-airflow airflow-temp

# Copy the hook from the cloned repo to where Apache Airflow is located
# Replace LINK_TO_SITE_PACKAGES_DIR with the path you found above
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/hooks/

# For example: for me, it would be 
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/hooks/

# Do the same with operator file
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/operators/

# For example: for me, it would be 
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/operators/

重新运行气流webserverscheduler,现在应该可以工作了。在

相关问题 更多 >