如何在气流中将值传递给Xcom

2024-06-13 18:54:00 发布

您现在位置:Python中文网/ 问答频道 /正文

last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}')}}".format(country,country) 

这是我在xcom中保存的值

where last_updated_utc > '{}';

并在where子句中使用它

但是xcom被正确地传递到我的where子句中

 where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm', 
 key='QueryTimeStamp_mm')}';

它传递整个字符串,我如何解决这个问题

.format(last_updated_at)

这就是我在where子句中传递它的方式

当我没有使用

 my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date', 
 key='QueryTimeStamp')}}"

Xcom工作得很好。但当我传递参数时,它不再工作了

Python可调用函数推动xcom

def match_dates(**Kwargs):     
try:     
print("enters the try block")     
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")     
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)

拉xcom的PostGresOperator

import_redshift_table_zm = PostgresOperator(
        task_id='copy_data_from_redshift_zm',
        postgres_conn_id='postgres_default',
        sql="""
                BEGIN;

            create table angaza_public_spark.stag_angaza_users_zm as
            Select * FROM angaza_public_zm.users
            where last_updated_utc > '{}';
              END;
                   
    """.format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))


Tags: keyidsformattaskdatematchtiwhere
1条回答
网友
1楼 · 发布于 2024-06-13 18:54:00

有两件事

  1. 替换这个
{{ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}')}}

有了这个(注意{{之后和}}之前的空格)

{{ ti.xcom_pull(task_ids='Match_Updated_date_{}', 
 key='QueryTimeStamp_{}') }}
  1. 取决于使用last_updated_at参数的位置。气流中的每个运算符都有一个名为template_fields的类变量

如果您使用的是自定义运算符,请确保将last_updated_at添加为template_fields的一部分

例如:

template_fields = ('templates_dict', 'op_args', 'op_kwargs')

相关问题 更多 >