一个dbapi 2.0接口和用于databricks交互集群的sqlalchemy方言。

databricks-dbapi的Python项目详细描述


pypipyversions

一个围绕pyhive的薄包装器,用于创建到交互式数据块群集的DBAPI连接。

还为databricks交互集群提供sqlalchemy方言。

安装

使用pip安装:

pip install databricks-dbapi

对于sqlalchemy支持,请使用安装:

pip install databricks-dbapi[sqlalchemy]

用法

connect()函数返回一个pyhive配置单元连接对象,该对象在内部包装一个thrift连接。

使用databricks api令牌(推荐):

importosfromdatabricks_dbapiimportdatabrickstoken=os.environ["DATABRICKS_TOKEN"]host=os.environ["DATABRICKS_HOST"]cluster=os.environ["DATABRICKS_CLUSTER"]connection=databricks.connect(host=host,cluster=cluster,token=token,)cursor=connection.cursor()cursor.execute("SELECT * FROM some_table LIMIT 100")print(cursor.fetchone())print(cursor.fetchall())

使用您的用户名和密码(不推荐):

importosfromdatabricks_dbapiimportdatabricksuser=os.environ["DATABRICKS_USER"]password=os.environ["DATABRICKS_PASSWORD"]host=os.environ["DATABRICKS_HOST"]cluster=os.environ["DATABRICKS_CLUSTER"]connection=databricks.connect(host=host,cluster=cluster,user=user,password=password)cursor=connection.cursor()cursor.execute("SELECT * FROM some_table LIMIT 100")print(cursor.fetchone())print(cursor.fetchall())

在azure平台上连接,或使用http_path

importosfromdatabricks_dbapiimportdatabrickstoken=os.environ["DATABRICKS_TOKEN"]host=os.environ["DATABRICKS_HOST"]http_path=os.environ["DATABRICKS_HTTP_PATH"]connection=databricks.connect(host=host,http_path=http_path,token=token,)cursor=connection.cursor()cursor.execute("SELECT * FROM some_table LIMIT 100")print(cursor.fetchone())print(cursor.fetchall())

pyhive连接还提供异步功能:

importosfromdatabricks_dbapiimportdatabricksfromTCLIService.ttypesimportTOperationStatetoken=os.environ["DATABRICKS_TOKEN"]host=os.environ["DATABRICKS_HOST"]cluster=os.environ["DATABRICKS_CLUSTER"]connection=databricks.connect(host=host,cluster=cluster,token=token,)cursor=connection.cursor()cursor.execute("SELECT * FROM some_table LIMIT 100",async_=True)status=cursor.poll().operationStatewhilestatusin(TOperationState.INITIALIZED_STATE,TOperationState.RUNNING_STATE):logs=cursor.fetch_logs()formessageinlogs:print(message)# If needed, an asynchronous query can be cancelled at any time with:# cursor.cancel()status=cursor.poll().operationStateprint(cursor.fetchall())

炼金术

安装databricks-dbapi包后,databricks+pyhive方言/驱动程序将注册到sqlalchemy。在传递引擎url时填写所需的信息。

fromsqlalchemyimport*fromsqlalchemy.engineimportcreate_enginefromsqlalchemy.schemaimport*# Standard Databricks with user + password# provide user, password, company name for url, database name, cluster nameengine=create_engine("databricks+pyhive://<user>:<password>@<companyname>.cloud.databricks.com:443/<database>",connect_args={"cluster":"<cluster>"})# Standard Databricks with token# provide token, company name for url, database name, cluster nameengine=create_engine("databricks+pyhive://token:<databricks_token>@<companyname>.cloud.databricks.com:443/<database>",connect_args={"cluster":"<cluster>"})# Azure Databricks with user + password# provide user, password, region for url, database name, http_path (with cluster name)engine=create_engine("databricks+pyhive://<user>:<password>@<region>.azuredatabricks.net:443/<database>",connect_args={"http_path":"<azure_databricks_http_path>"})# Azure Databricks with token# provide token, region for url, database name, http_path (with cluster name)engine=create_engine("databricks+pyhive://token:<databrickstoken>@<region>.azuredatabricks.net:443/<database>",connect_args={"http_path":"<azure_databricks_http_path>"})logs=Table("my_table",MetaData(bind=engine),autoload=True)print(select([func.count("*")],from_obj=logs).scalar())

有关主机名、群集名和http路径的详细信息,请参阅以下文档:

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
通过signingReport运行安卓应用程序时发生java错误   java我需要实现hashCode()和equals()方法吗?   javascript Selenium Java从Pinterest下载图像   java Tomcat(嵌入式),无需重新启动即可进行更改   java检查每个if是否有匹配的endif   的java接口。我也是。罐子   mysql java。sql。SQLException:列索引超出范围,0<1。使用准备好的语句   如何在java中修改json字符串?   javascript智能手机的独特功能   web服务java。lang.NoSuchMethodError:javax。ws。rs.core。响应$Status$Family   sockets通过浏览器/URL与ServerSocket建立Java连接   JavaSpring数据自定义存储库和接口隔离原则   swing从不同的java类访问jdialog的一个实例   java如何根据发出的事件有条件地缓冲分组的可观察/流量?   用java将电子邮件正文导出为html文件   JavaSpring获取ServletContext并将其作为Bean提供   setter不做java中应该做的事情