气流:同一个操作员实例是否可以重复使用并多次执行,以保持运行之间的状态?

2024-05-12 10:20:20 发布

您现在位置:Python中文网/ 问答频道 /正文

您能否解释一下,在某些情况下,运算符实例是否可以重用,并且execute()方法将多次执行,并且状态在execute()运行之间保持

换句话说,在气流中是否可能出现这种情况

  1. 运算符中的自变量在init中初始化

  2. execute()方法读取自变量并对其进行更改

  3. execute()在同一个操作符实例上运行一次,例如,由于重新启动或其他原因 并且可以读取由上一次执行运行更改的自变量吗

    class MyOperator(BaseOperator):
    
      def __init__(self,
                 param_1
                 ...
                 param_n):
    
          self.var1=param_1
    
      def execute(self, context):
          #do some logic with self variable
          self.var1  += 1 #
    

Tags: 实例方法selfexecuteparaminit状态def
1条回答
网友
1楼 · 发布于 2024-05-12 10:20:20

由于以下原因,您所描述的场景不可能实现

当Airflow Scheduler将任务实例分派到队列时,工作进程中的每个心跳都会初始化该任务

这是因为填充DagBag的每个心跳都会初始化操作符实例

运行之间存储的任何值在重新初始化时都会重置

如果需要在两次运行之间存储值,可以使用Variable模型来存储这些值

from airflow.models import Variable

def execute(self, context):
    #do some logic with self variable 
    var1 = Variable.get(
            "count", 
            deserialize_json=True,
            default_var=0
        )
    var1 += 1    
    Variable.set("count", var1)

相关问题 更多 >