我正在尝试向Google Cloud Composer(气流)添加一个自定义操作符,但它似乎找不到该操作符。我在这方面花了不少时间,并尝试:
我已经修改了示例中的代码,以尝试获取运算符
dags/my_dag.py
import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END dag_imports]
# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]
# [START dag]
dag1 = DAG('backup_vm_instance',
description='Backup a Compute Engine instance using an Airflow DAG',
schedule_interval=INTERVAL,
start_date=START_DATE,
catchup=False)
# [END dag]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]
# Airflow DAG definition
begin >> stop_instance
插件/my_operator.py
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]
class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instance."""
@apply_defaults
def __init__(self, project, zone, instance, *args, **kwargs):
self.project = project
self.zone = zone
self.instance = instance
super(StopInstanceOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Hello world")
# [END stop_oper_no_xcom]
class GoogleComputeEnginePlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'gce_commands_plugin'
operators = [StopInstanceOperator]
这是代码/结构,我得到的错误是
Broken DAG: [/home/airflow/gcs/dags/my_dag.py] No module named 'airflow.operators.my_operator'
我已经测试了您的Composer文件,在清理代码之后,它似乎工作得很好
首先,请确保您被授予以下permissions权限,这些权限是添加和更新插件所必需的:
从
plugins/my_operator.py
开始:代码是正确的,您需要注意
name = 'gce_commands_plugin'
变量,该变量为插件提供内部名称gce_commands_plugin
(因此您可以在DAG文件中引用它),并向其添加一个运算符(StopInstanceOperator
)然后,我们有
dags/my_dag.py
:我在这里注意到:
您试图引用插件的文件名,有什么问题吗。必须从
my_operator.py
文件中引用name = 'gce_commands_plugin'
变量:您不能使用以下代码片段:
这是因为您决定不使用
dummy_operator
,它定义了begin
变量当您的文件准备好后,您可以将它们复制到Composer的bucket中,并在UI中看到积极的结果:
我希望有帮助
相关问题 更多 >
编程相关推荐