将数据从Amazon s3复制到redshi

2024-09-29 17:18:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用airflow将数据从S3 bucket复制到Redshift Database,下面是我的代码:

from airflow.hooks import PostgresHook
path = 's3://my_bucket/my_file.csv'

redshift_hook = PostgresHook(postgres_conn_id='table_name')
access_key='abcd' 
secret_key='aaaa'
query= """
copy my_table 
FROM '%s' 
ACCESS_KEY_ID '%s' 
SECRET_ACCESS_KEY '%s' 
REGION 'eu-west-1' 
ACCEPTINVCHARS 
IGNOREHEADER 1 
FILLRECORD 
CSV
BLANKSASNULL 
EMPTYASNULL 
MAXERROR 100 
DATEFORMAT 'MM/DD/YYYY'
""" % ( path,
        access_key,
        secret_key) 

redshift_hook.run(query)

但当我运行此脚本时,它会引发以下错误:

^{pr2}$

请帮我拿一下这个好吗? 提前谢谢你。在


Tags: 数据pathkeyredshiftsecretbucketaccessmy
1条回答
网友
1楼 · 发布于 2024-09-29 17:18:38

您的连接标识与表名相同? 您需要转到位于http://………/admin/connections/的airflow ui,并为redshift集群添加一个postgres连接id。现在把您写table_name的连接id的名称。在

当你在它的时候,定义一个s3连接并把访问和密钥放在那里。通过按连接id名称实例化SSHHook来加载它,然后从中获取密钥。在

最后,将您的…run(query)替换为^{}。将键放入参数dict中,然后在SQL字符串中使用:

from airflow.operators import PostgresOperator
form airflow.hooks import S3Hook

s3 = S3hook(aws_conn_id="s3_conn_id_in_airflow_ui_man") 
redshift_load_task = PostgresOperator("""
copy my_table 
FROM '{{ params.source }}' 
ACCESS_KEY_ID '{{ params.access_key}}' 
SECRET_ACCESS_KEY '{{ params.secret_key }}' 
REGION 'eu-west-1' 
ACCEPTINVCHARS 
IGNOREHEADER 1 
FILLRECORD 
CSV
BLANKSASNULL 
EMPTYASNULL 
MAXERROR 100 
DATEFORMAT 'MM/DD/YYYY'
""",
postgres_conn_id="redshift_conn_id_in_airflow_ui_man",
database="uh_you_tell_me",
params={
    'source': 's3://my_bucket/my_file.csv',
    'access_key': s3.get_credentials().access_key,
    'secret_key': s3.get_credentials().secret_key,
},
)

相关问题 更多 >

    热门问题