气流:如何在自定义运算符上访问{{execution date}}?

2024-05-11 21:06:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Airflow上构建了一个自定义操作符,它调用API获取数据,然后将数据写入BigQuery。但是,问题是我必须将execution_date宏作为API参数传递,以便能够调用该日期的数据。遗憾的是,当我尝试这样做时,我的操作员无法解析我传递的jinja模板。当我检查我为此做的日志记录时,它只显示如下图所示的模板。 我希望你们能帮忙

Airflow Logs

这是自定义运算符和dag的代码。谢谢

                 ...
class MyOperator(BaseOperator):

     def __init__(self,date):
          super(MyOperator,self).__init__(*arg,**kwargs)

          self.date = date

     def __pull_from_api(self):
          api_link = "somelink.com/api/date={}".format(self.date)
          data = request.get(api_link).json()
          return data

     def execute(self,context):
          data = self.__pull_from_api()

                 ...


dag = DAG('My Pipeline', default_args=default_args)

t1 = MyOperator(date='{{ execution_date}}', task_id='my_pipeline_1', dag=dag)
t1


Tags: 数据fromself模板apidatadateinit
2条回答

我鼓励大家看看PythonOperatorcontext dictionary。在Composer中使用变量时,这两种方法都很有用

首先,由于您已经有了自定义操作符,我强烈建议您查看一下here。有一些常用的宏和模板使用气流。因此,您可以找出您可能有错误的地方

最好的方法是从上下文中获取执行日期,该日期已经传递给execute(self,context)

例如,我在这里设置执行日期的字符串表示形式:

self.execution_date_str = context["execution_date"].strftime("%Y-%m-%d")

Example in LatestOnlyOperator

相关问题 更多 >