一套用于PostgreSQL数据库查询和基于sqlalchemy的操作的实用程序
pg-database-utils的Python项目详细描述
pg数据库实用工具
一套用于PostgreSQL数据库查询和基于sqlalchemy的操作的实用程序。在
此库支持:
TSVECTOR
、JSON
和{}索引(适用于PostgreSQL 9.5+版本) - 生成的列(对于PostgreSQL版本12+)
- Django项目的可选Django数据库配置
它还包括:
- 帮助程序使最常见的DDL查询更具可读性
- 用于查询JSON和TSVECTOR列的Performant函数
- 支持来自现有表和/或
VALUES
子句的SELECT INTO
查询 - 支持需要应用程序逻辑的
UPDATE
查询
安装
安装方式:
pip install pg-database-utils
配置
此项目旨在使配置更简单。 如果您已经在Django中定义了数据库连接,那么您可以重用它们; 否则,您可以在没有Django作为依赖项的情况下配置自己的。在
配置Django
如果您想使用“default”数据库,不需要配置。在
如果要指定要从中读取设置的特定Django数据库:
- 使用数据库名称创建JSON配置文件:
- 设置
DATABASE_CONFIG_JSON
环境变量以指向文件的位置
Note:“django db key”优先于JSON文件中的所有其他数据库连接设置。 如果指定Django数据库,则将使用这些数据库连接设置。在
不使用Django进行配置
- 创建至少具有所需设置的JSON配置文件(即
database-name
):
{"database-name":"required",# Name of the database to query"database-engine":"optional",# Defaults to postgres"database-host":"optional",# Defaults to 127.0.0.1"database-port":"optional",# Defaults to 5432"database-user":"optional",# Defaults to postgres"database-password":"optional"# For trusted users like postgres}
- 设置
DATABASE_CONFIG_JSON
环境变量以指向文件的位置
带或不带Django的附加配置
其他配置选项包括:
{"connect-args":{"sslmode":"require"},# Defaults to postgres settings, "prefer" by default"date-format":"optional",# Defaults to "%Y-%m-%d""timestamp-format":"optional",# Defaults to "%Y-%m-%d %H:%M:%S""pooling-args":{# To override sqlalchemy pooling config"max_overflow":0,# Defaults to 10 connections beyond pool size"pool_recycle":60,# Defaults to no timeout (-1) in seconds"pool_size":20,# Defaults to 5 connections"pool_timeout":30# Defaults to 30 seconds}}
Note:“日期格式”和“时间戳格式”必须与PostgreSQL中配置的格式兼容。在
使用
此库旨在使常用数据库操作变得简单易读,
因此,大多数实用程序函数都设计为使用字符串或sqlalchemy
对象作为参数。在
模式实用程序
- 创建和关联表
frompg_databaseimportschemamy_table=schema.create_table("my_table",dropfirst=True,index_cols={"id":"unique"},id="int",name="int",addr="text",geom="bytea",deleted="bool")schema.create_index(my_table,"name",index_op="unique")schema.create_table("other_table",id="int",my_table_id="int",val="text")schema.create_foreign_key("other_table","my_table_id","my_table.id")
- 正在更改表
frompg_databaseimportschemaschema.alter_column_type("my_table","name","text")schema.create_index("my_table","name",index_op="to_tsvector")schema.create_column("my_table","json_col","jsonb",checkfirst=True)schema.create_index("my_table","json_col",index_op="json_full")# These steps require the postgis extensionschema.alter_column_type("my_table","geom","geometry",using="geometry(Polygon,4326)")schema.create_index("my_table","geom",index_op="spatial")
- 删除数据库对象
frompg_databaseimportschemaall_tables=schema.get_metadata().tablesother_table=all_tables["other_table"]schema.drop_foreign_key(other_table,"other_table_my_table_id_fkey")schema.drop_index("my_table",index_name="my_table_json_col_json_full_idx")schema.drop_table("my_table")schema.drop_table(other_table)
SQL实用程序
- 插入行
importjsonfromdatetimeimportdatetime,timedeltafrompg_databaseimportsqlcreate_date=datetime.now()sql.select_into("new_table",[(1,"one",{},create_date),(2,"two",{},create_date),(3,"three",{},create_date)],"id,val,json,created","int,text,jsonb,date")
- 正在更新行
frompg_databaseimportsqldefupdate_row(row):row=list(row)pk,val,created,jval=row[0],row[1],row[2],row[3]row[1]=f"{pk}{val} first batch"row[2]=created+timedelta(days=1)row[3]={"id":pk,"val":val,"batch":"first"}returnrowsql.update_rows("new_table","id","val,created,json",update_row,batch_size=3)
- 查询行
frompg_databaseimportsql,schema# Reduce database queries by sending a sqlalchemy tableall_tables=schema.get_metadata().tablesnew_table=all_tables["new_table"]schema.create_index(new_table,"json",index_op="json_path")schema.create_index(new_table,"val",index_op="to_tsvector")sql.query_json_keys(new_table,"json",{"batch":"first"})sql.query_tsvector_columns("new_table","val","batch first")
- 用于
INSERT
s或SELECT INTO
的Values子句,在执行时使用自定义连接参数
fromdatetimeimportdatetimefromsqlalchemyimportcolumnfromsqlalchemy.sqlimportInsert,Selectfrompg_databaseimportsql,schema# Prepare data, column names, column types and table namecreate_date=datetime.now()values_data=[(1,"one",{},True,create_date),(2,"two",{},False,create_date),(3,"three",{},0,create_date)]values_names=["id","val","json","boolean","created"]values_types=["int","text","jsonb","bool","date"]values_table="new_table"# SELECT INTO to create a new table from raw values using sslmode==requireselect_vals=sql.Values(values_names,values_types,*values_data)select_into=sql.SelectInto([column(c)forcinvalues_names],values_table)withschema.get_engine(connect_args={"sslmode":"require"}).connect()asconn:conn.execute(select_into.select_from(select_vals).execution_options(autocommit=True))# INSERT INTO to add new records from raw values using custom pooling argsexisting_table=schema.get_metadata().tables[values_table]insert_vals=sql.Values(values_names,values_types,*values_data)insert_from=Select([column(c)forcinvalues_names]).select_from(insert_vals)insert_into=Insert(existing_table).from_select(names=values_names,select=insert_from)withschema.get_engine(pooling_args={"pool_size":20,"max_overflow":0}).connect()asconn:conn.execute(insert_into.execution_options(autocommit=True))
- 项目
标签: