适用于Amazon Athena的符合Python DB API 2.0(PEP 249)的客户端
lambda-pyathena的Python项目详细描述
皮雅典娜
pyathena是一个符合pythonDB API 2.0 (PEP 249)的客户端,用于Amazon Athena。
雅典娜羔羊
lambda pyathena是一个pyathena分支,它只需要从安装中删除boto3和botocore, 产生了一个aws lambda友好包。
要求
- Python
- 第2、7、3.5、3.6、3.7节
安装
$ pip install lambda-pyathena
额外套餐:
Package | Install command | Version |
---|---|---|
Pandas | ^{tt1}$ | >=0.24.0 |
SQLAlchemy | ^{tt2}$ | >=1.0.0, <1.3.0 |
使用量
基本用法
frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM one_row")print(cursor.description)print(cursor.fetchall())
光标迭代
frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM many_rows LIMIT 10")forrowincursor:print(row)
使用参数查询
支持的DB API paramstyle仅为PyFormat。 PyFormat只支持named placeholders旧的%运算符样式,参数指定字典格式。
frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute(""" SELECT col_string FROM one_row_complex WHERE col_string = %(param)s """,{'param':'a string'})print(cursor.fetchall())
如果查询中包含%字符,则必须使用%%进行转义,如下所示:
SELECTcol_stringFROMone_row_complexWHEREcol_string=%(param)sORcol_stringLIKE'a%%'
sql炼金术
使用pip install "SQLAlchemy>=1.0.0, <1.3.0"或pip install PyAthena[SQLAlchemy]安装sqlalchemy。 支持的sqlalchemy为1.0.0或更高版本,小于1.3.0。
fromurllib.parseimportquote_plus# PY2: from urllib import quote_plusfromsqlalchemy.engineimportcreate_enginefromsqlalchemy.sql.expressionimportselectfromsqlalchemy.sql.functionsimportfuncfromsqlalchemy.sql.schemaimportTable,MetaDataconn_str='awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\ '{schema_name}?s3_staging_dir={s3_staging_dir}'engine=create_engine(conn_str.format(aws_access_key_id=quote_plus('YOUR_ACCESS_KEY_ID'),aws_secret_access_key=quote_plus('YOUR_SECRET_ACCESS_KEY'),region_name='us-west-2',schema_name='default',s3_staging_dir=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))many_rows=Table('many_rows',MetaData(bind=engine),autoload=True)print(select([func.count('*')],from_obj=many_rows).scalar())
连接字符串的格式如下:
awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...
如果您没有使用实例配置文件或boto3配置文件指定aws_access_key_id和aws_secret_access_key:
awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...
注意:s3_staging_dir需要引号。如果aws_access_key_id、aws_secret_access_key和其他参数包含特殊字符,则还需要引号。
熊猫数据帧的最小示例:
frompyathenaimportconnectimportpandasaspdconn=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2')df=pd.read_sql("SELECT * FROM many_rows",conn)print(df.head())
作为熊猫数据帧:
frompyathenaimportconnectfrompyathena.utilimportas_pandascursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM many_rows")df=as_pandas(cursor)print(df.describe())
如果要直接使用pandasDataFrame object,可以使用PandasCursor。
AsynchronousCursor是一个使用concurrent.futures包的简单实现。 python 2.7使用backport of the concurrent.futures包。 此游标不符合DB API 2.0 (PEP 249)。
可以通过指定^{tt15}来使用AsynchronousCursor$ 使用连接方法或连接对象。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()
frompyathena.connectionimportConnectionfrompyathena.async_cursorimportAsyncCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()
在调用连接对象的cursor方法时,还可以通过指定cursor类来使用它。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(AsyncCursor)
frompyathena.connectionimportConnectionfrompyathena.async_cursorimportAsyncCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(AsyncCursor)
默认工作线程数为5或CPU数*5。 如果要更改工人数量,可以按如下方式指定。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor(max_workers=10)
asynchronouscursor的execute方法返回查询id和future object的元组。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")
future object的返回值是一个AthenaResultSet对象。 这个对象有一个接口,可以获取和迭代查询结果,类似于同步游标。 它还包含有关查询执行结果的信息。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")result_set=future.result()print(result_set.state)print(result_set.state_change_reason)print(result_set.completion_date_time)print(result_set.submission_date_time)print(result_set.data_scanned_in_bytes)print(result_set.execution_time_in_millis)print(result_set.output_location)print(result_set.description)forrowinresult_set:print(row)
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")result_set=future.result()print(result_set.fetchall())
使用异步游标取消查询需要查询id。
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")cursor.cancel(query_id)
注意:future object的cancel方法不会取消查询。
pandascursor
pandascursor直接处理输出到s3的查询执行结果的csv文件。 此光标用于在执行查询后下载csv文件,然后加载到DataFrame object。 性能比用游标获取数据要好。
可以通过指定^{tt15}来使用pandascursor$ 使用连接方法或连接对象。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()
frompyathena.connectionimportConnectionfrompyathena.pandas_cursorimportPandasCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()
在调用连接对象的cursor方法时,还可以通过指定cursor类来使用它。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(PandasCursor)
frompyathena.connectionimportConnectionfrompyathena.pandas_cursorimportPandasCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(PandasCursor)
as-pandas方法返回DataFrame object。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()df=cursor.execute("SELECT * FROM many_rows").as_pandas()print(df.describe())print(df.head())
支持获取和迭代查询结果。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")print(cursor.fetchone())print(cursor.fetchmany())print(cursor.fetchall())
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")forrowincursor:print(row)
雅典娜数据类型的日期和时间戳返回为pandas.Timestamp类型。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT col_timestamp FROM one_row_complex")print(type(cursor.fetchone()[0]))# <class 'pandas._libs.tslibs.timestamps.Timestamp'>
还可以检索查询的执行信息。
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")print(cursor.state)print(cursor.state_change_reason)print(cursor.completion_date_time)print(cursor.submission_date_time)print(cursor.data_scanned_in_bytes)print(cursor.execution_time_in_millis)print(cursor.output_location)
注意:pandascursor处理内存中的csv文件。注意记忆容量。
测试
取决于以下环境变量:
$ exportAWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID $ exportAWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY $ exportAWS_DEFAULT_REGION=us-west-2 $ exportAWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
$ pip install pipenv $ pipenv install --dev $ pipenv run scripts/test_data/upload_test_data.sh $ pipenv run pytest $ pipenv run scripts/test_data/delete_test_data.sh