last_updated_at = "{{ti.xcom_pull(task_ids='Match_Updated_date_{}',
key='QueryTimeStamp_{}')}}".format(country,country)
这是我在xcom中保存的值
where last_updated_utc > '{}';
并在where子句中使用它
但是xcom被正确地传递到我的where子句中
where last_updated_utc > '{ti.xcom_pull(task_ids='Match_Updated_date_mm',
key='QueryTimeStamp_mm')}';
它传递整个字符串,我如何解决这个问题
.format(last_updated_at)
这就是我在where子句中传递它的方式
当我没有使用
my_xcom_value = "{{ti.xcom_pull(task_ids='Match_Updated_date',
key='QueryTimeStamp')}}"
Xcom工作得很好。但当我传递参数时,它不再工作了
Python可调用函数推动xcom
def match_dates(**Kwargs):
try:
print("enters the try block")
response = s3.get_object(Bucket='mygluecrawlerbucket',Key='DateTime/Users/my_date_{}.txt'.format(Kwargs['key1']))
print("responce is ", response)
status = response['ResponseMetadata']['HTTPStatusCode']
if status == 200:
print("Enters the status block ")
data = response['Body'].read().decode("utf-8")
ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=someVariable)
拉xcom的PostGresOperator
import_redshift_table_zm = PostgresOperator(
task_id='copy_data_from_redshift_zm',
postgres_conn_id='postgres_default',
sql="""
BEGIN;
create table angaza_public_spark.stag_angaza_users_zm as
Select * FROM angaza_public_zm.users
where last_updated_utc > '{}';
END;
""".format("{{ti.xcom_pull(task_ids='Match_Updated_dates_zm', key='QueryTimeStamp_{}')}}".format(country))
有两件事
有了这个(注意
{{
之后和}}
之前的空格)last_updated_at
参数的位置。气流中的每个运算符都有一个名为template_fields
的类变量如果您使用的是自定义运算符,请确保将
last_updated_at
添加为template_fields
的一部分例如:
相关问题 更多 >
编程相关推荐