import pandas as pd
import sqlalchemy
import uuid
import os
def upsert_df(df: pd.DataFrame, table_name: str, engine: sqlalchemy.engine.Engine):
"""Implements the equivalent of pd.DataFrame.to_sql(..., if_exists='update')
(which does not exist). Creates or updates the db records based on the
dataframe records.
Conflicts to determine update are based on the dataframes index.
This will set unique keys constraint on the table equal to the index names
1. Create a temp table from the dataframe
2. Insert/update from temp table into table_name
Returns: True if successful
"""
# If the table does not exist, we should just use to_sql to create it
if not engine.execute(
f"""SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{table_name}');
"""
).first()[0]:
df.to_sql(table_name, engine)
return True
# If it already exists...
temp_table_name = f"temp_{uuid.uuid4().hex[:6]}"
df.to_sql(temp_table_name, engine, index=True)
index = list(df.index.names)
index_sql_txt = ", ".join([f'"{i}"' for i in index])
columns = list(df.columns)
headers = index + columns
headers_sql_txt = ", ".join(
[f'"{i}"' for i in headers]
) # index1, index2, ..., column 1, col2, ...
# col1 = exluded.col1, col2=excluded.col2
update_column_stmt = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in columns])
# For the ON CONFLICT clause, postgres requires that the columns have unique constraint
query_pk = f"""
ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS unique_constraint_for_upsert;
ALTER TABLE "{table_name}" ADD CONSTRAINT unique_constraint_for_upsert UNIQUE ({index_sql_txt});
"""
engine.execute(query_pk)
# Compose and execute upsert query
query_upsert = f"""
INSERT INTO "{table_name}" ({headers_sql_txt})
SELECT {headers_sql_txt} FROM "{temp_table_name}"
ON CONFLICT ({index_sql_txt}) DO UPDATE
SET {update_column_stmt};
"""
engine.execute(query_upsert)
engine.execute(f"DROP TABLE {temp_table_name}")
return True
from sqlalchemy import create_engine
#create a connection from Postgre URI
cnxn = create_engine("postgresql+psycopg2://username:password@host:port/database")
#write dataframe to database
df.to_sql("my_table", con=cnxn, schema="myschema")
如果您使用的是PostgreSQL 9.5或更高版本,则可以使用临时表和
INSERT ... ON CONFLICT
语句执行UPSERT:我已经多次需要它,最后我创建了一个gist for it
函数如下所示,如果是第一次持久化数据帧,它将创建表,如果表已经存在,它将更新表:
如果您已经有一个pandas数据帧,那么可以使用df.to_sql直接通过SQLAlchemy推送数据
相关问题 更多 >
编程相关推荐