我正在尝试创建我的第一个airflow插件,一个操作符,但我无法使execute方法正常运行。在
我已经找到了一个很好的例子来说明如何构造插件模块并将其导入气流。在
结构和代码如下:
airflow
├── plugins
├── generic_plugin
├── __init__.py
└── operators
├── __init__.py
└── generic_operator.py
气流/插件/通用插件/初始化
^{pr2}$气流/插件/generic_plugin/operators/generic_运算符.py在
from airflow.models import BaseOperator
import json
class GenericOperator(BaseOperator):
def __init__(self,
data,
destination_path,
*args,
**kwargs):
super(GenericOperator, self).__init__(*args, **kwargs)
self.data = data
self.destination_path = destination_path
def execute(self, context):
self.dump_data(
data=self.data,
destination_path=self.destination_path)
def dump_data(self, data, destination_path):
with open(destination_path, 'w') as out_file:
json.dump(data, out_file)
在python控制台中运行这段代码,可以正确导入所有内容,但execute方法无法运行。在
from airflow.operators.generic_plugin import GenericOperator
generic_op = GenericOperator(
task_id='gen_test',
data={'some_json_data': [
{'id': 1, 'value': 10}, {'id': 2, 'value': 35}]},
destination_path='./some_json_data.json')
但是,如果我手动运行dump_data方法,它会按预期工作:
generic_op.dump_data(
data=generic_op.data,
destination_path=generic_op.destination_path)
我是在如何设置execute方法时遗漏了什么呢?或者execute方法不是应该这样工作的吗?在
目前没有回答
相关问题 更多 >
编程相关推荐