气流插件运算符未运行执行方法

2024-10-01 22:26:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试创建我的第一个airflow插件,一个操作符,但我无法使execute方法正常运行。在

我已经找到了一个很好的例子来说明如何构造插件模块并将其导入气流。在

结构和代码如下:

airflow
├── plugins
  ├── generic_plugin
    ├── __init__.py
    └── operators
        ├── __init__.py
        └── generic_operator.py

气流/插件/通用插件/初始化

^{pr2}$

气流/插件/generic_plugin/operators/generic_运算符.py在

from airflow.models import BaseOperator
import json


class GenericOperator(BaseOperator):
    def __init__(self,
                 data,
                 destination_path,
                 *args,
                 **kwargs):
        super(GenericOperator, self).__init__(*args, **kwargs)
        self.data = data
        self.destination_path = destination_path

    def execute(self, context):
        self.dump_data(
            data=self.data,
            destination_path=self.destination_path)

    def dump_data(self, data, destination_path):
        with open(destination_path, 'w') as out_file:
            json.dump(data, out_file)

在python控制台中运行这段代码,可以正确导入所有内容,但execute方法无法运行。在

from airflow.operators.generic_plugin import GenericOperator

generic_op = GenericOperator(
    task_id='gen_test',
    data={'some_json_data': [
        {'id': 1, 'value': 10}, {'id': 2, 'value': 35}]},
    destination_path='./some_json_data.json')

但是,如果我手动运行dump_data方法,它会按预期工作:

generic_op.dump_data(
    data=generic_op.data,
    destination_path=generic_op.destination_path)

我是在如何设置execute方法时遗漏了什么呢?或者execute方法不是应该这样工作的吗?在


Tags: path方法pyself插件jsonexecutedata

热门问题