适用于Amazon Athena的符合Python DB API 2.0(PEP 249)的客户端

lambda-pyathena的Python项目详细描述


https://img.shields.io/pypi/pyversions/PyAthena.svghttps://travis-ci.org/laughingman7743/PyAthena.svg?branch=masterhttps://codecov.io/gh/laughingman7743/PyAthena/branch/master/graph/badge.svghttps://img.shields.io/pypi/l/PyAthena.svghttps://img.shields.io/pypi/dm/PyAthena.svg

皮雅典娜

pyathena是一个符合pythonDB API 2.0 (PEP 249)的客户端,用于Amazon Athena

雅典娜羔羊

lambda pyathena是一个pyathena分支,它只需要从安装中删除boto3和botocore, 产生了一个aws lambda友好包。

要求

  • Python
  • 第2、7、3.5、3.6、3.7节

安装

$ pip install lambda-pyathena

额外套餐:

PackageInstall commandVersion
Pandas^{tt1}$>=0.24.0
SQLAlchemy^{tt2}$>=1.0.0, <1.3.0

使用量

基本用法

frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM one_row")print(cursor.description)print(cursor.fetchall())

光标迭代
frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM many_rows LIMIT 10")forrowincursor:print(row)

使用参数查询

支持的DB API paramstyle仅为PyFormatPyFormat只支持named placeholders旧的%运算符样式,参数指定字典格式。

frompyathenaimportconnectcursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("""
               SELECT col_string FROM one_row_complex
               WHERE col_string = %(param)s
               """,{'param':'a string'})print(cursor.fetchall())

如果查询中包含%字符,则必须使用%%进行转义,如下所示:

SELECTcol_stringFROMone_row_complexWHEREcol_string=%(param)sORcol_stringLIKE'a%%'

sql炼金术

使用pip install "SQLAlchemy>=1.0.0, <1.3.0"pip install PyAthena[SQLAlchemy]安装sqlalchemy。 支持的sqlalchemy为1.0.0或更高版本,小于1.3.0。

fromurllib.parseimportquote_plus# PY2: from urllib import quote_plusfromsqlalchemy.engineimportcreate_enginefromsqlalchemy.sql.expressionimportselectfromsqlalchemy.sql.functionsimportfuncfromsqlalchemy.sql.schemaimportTable,MetaDataconn_str='awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'engine=create_engine(conn_str.format(aws_access_key_id=quote_plus('YOUR_ACCESS_KEY_ID'),aws_secret_access_key=quote_plus('YOUR_SECRET_ACCESS_KEY'),region_name='us-west-2',schema_name='default',s3_staging_dir=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))many_rows=Table('many_rows',MetaData(bind=engine),autoload=True)print(select([func.count('*')],from_obj=many_rows).scalar())

连接字符串的格式如下:

awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...

如果您没有使用实例配置文件或boto3配置文件指定aws_access_key_idaws_secret_access_key

awsathena+rest://:@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...

注意:s3_staging_dir需要引号。如果aws_access_key_idaws_secret_access_key和其他参数包含特殊字符,则还需要引号。

熊猫

熊猫数据帧的最小示例:

frompyathenaimportconnectimportpandasaspdconn=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2')df=pd.read_sql("SELECT * FROM many_rows",conn)print(df.head())

作为熊猫数据帧:

frompyathenaimportconnectfrompyathena.utilimportas_pandascursor=connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor()cursor.execute("SELECT * FROM many_rows")df=as_pandas(cursor)print(df.describe())

如果要直接使用pandasDataFrame object,可以使用PandasCursor

异步光标

AsynchronousCursor是一个使用concurrent.futures包的简单实现。 python 2.7使用backport of the concurrent.futures包。 此游标不符合DB API 2.0 (PEP 249)

可以通过指定^{tt15}来使用AsynchronousCursor$ 使用连接方法或连接对象。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()
frompyathena.connectionimportConnectionfrompyathena.async_cursorimportAsyncCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()

在调用连接对象的cursor方法时,还可以通过指定cursor类来使用它。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(AsyncCursor)
frompyathena.connectionimportConnectionfrompyathena.async_cursorimportAsyncCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(AsyncCursor)

默认工作线程数为5或CPU数*5。 如果要更改工人数量,可以按如下方式指定。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor(max_workers=10)

asynchronouscursor的execute方法返回查询id和future object的元组。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")

future object的返回值是一个AthenaResultSet对象。 这个对象有一个接口,可以获取和迭代查询结果,类似于同步游标。 它还包含有关查询执行结果的信息。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")result_set=future.result()print(result_set.state)print(result_set.state_change_reason)print(result_set.completion_date_time)print(result_set.submission_date_time)print(result_set.data_scanned_in_bytes)print(result_set.execution_time_in_millis)print(result_set.output_location)print(result_set.description)forrowinresult_set:print(row)
frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")result_set=future.result()print(result_set.fetchall())

使用异步游标取消查询需要查询id。

frompyathenaimportconnectfrompyathena.async_cursorimportAsyncCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=AsyncCursor).cursor()query_id,future=cursor.execute("SELECT * FROM many_rows")cursor.cancel(query_id)

注意:future object的cancel方法不会取消查询。

pandascursor

pandascursor直接处理输出到s3的查询执行结果的csv文件。 此光标用于在执行查询后下载csv文件,然后加载到DataFrame object。 性能比用游标获取数据要好。

可以通过指定^{tt15}来使用pandascursor$ 使用连接方法或连接对象。

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()
frompyathena.connectionimportConnectionfrompyathena.pandas_cursorimportPandasCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()

在调用连接对象的cursor方法时,还可以通过指定cursor类来使用它。

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(PandasCursor)
frompyathena.connectionimportConnectionfrompyathena.pandas_cursorimportPandasCursorcursor=Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2').cursor(PandasCursor)

as-pandas方法返回DataFrame object

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()df=cursor.execute("SELECT * FROM many_rows").as_pandas()print(df.describe())print(df.head())

支持获取和迭代查询结果。

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")print(cursor.fetchone())print(cursor.fetchmany())print(cursor.fetchall())
frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")forrowincursor:print(row)

雅典娜数据类型的日期和时间戳返回为pandas.Timestamp类型。

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT col_timestamp FROM one_row_complex")print(type(cursor.fetchone()[0]))# <class 'pandas._libs.tslibs.timestamps.Timestamp'>

还可以检索查询的执行信息。

frompyathenaimportconnectfrompyathena.pandas_cursorimportPandasCursorcursor=connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',region_name='us-west-2',cursor_class=PandasCursor).cursor()cursor.execute("SELECT * FROM many_rows")print(cursor.state)print(cursor.state_change_reason)print(cursor.completion_date_time)print(cursor.submission_date_time)print(cursor.data_scanned_in_bytes)print(cursor.execution_time_in_millis)print(cursor.output_location)

注意:pandascursor处理内存中的csv文件。注意记忆容量。

证书

支持Boto3 credentials

附加环境变量:

$ exportAWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/

测试

取决于以下环境变量:

$ exportAWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ exportAWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ exportAWS_DEFAULT_REGION=us-west-2
$ exportAWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
运行测试
$ pip install pipenv
$ pipenv install --dev
$ pipenv run scripts/test_data/upload_test_data.sh
$ pipenv run pytest
$ pipenv run scripts/test_data/delete_test_data.sh

运行测试多个python版本
$ pip install pipenv
$ pipenv install --dev
$ pipenv run scripts/test_data/upload_test_data.sh
$ pyenv local3.7.2 3.6.8 3.5.7 2.7.16
$ pipenv run tox
$ pipenv run scripts/test_data/delete_test_data.sh

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何正确测试和调试使用照相机闪光灯的应用程序?   组织。jboss。放松点。spi。未处理的异常:java。lang.NoSuchMethodError:org。冬眠SessionFactory。openSession()Lorg/hibernate/Session;   在同一行上声明多个Java数组?   java Spring批处理管理员,无法替换占位符“批处理”。商业模式。脚本'   使用JQuery的网站上出现java HtmlUnit“不支持浏览器”错误   java JavaFX如何将图形“裁剪”到按钮   java处理mysql中包含逗号的数字   java Hibernate语法错误:应为点   如何根据给定的日期在java中获取30天的回溯日期   java Servlet URL映射   线程“awteventque0”java中的多线程java JFrame异常。util。EmptyStackException,即使堆栈先初始化   JavaSpring控制器/组件实现可序列化   java如何在游戏完成时启动带有按钮的对话框?   java Hibernate双向多对多实现   如何使用Xpath Java修改XML中的属性值,包括注释部分   java Mockito模拟构造函数示例   java如何在不重写的情况下写入文本文件?