使用SQLAlchemy大容量插入Pandas数据帧

2024-09-24 02:21:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一些相当大的pandas数据帧,我想使用新的大容量SQL映射通过SQL Alchemy将它们上载到microsoftsql服务器。pandas.to-sql方法虽然不错,但速度很慢。

我写代码有困难。。。

我希望能够向此函数传递一个我正在调用table的pandas数据帧、一个我正在调用schema的架构名和一个我正在调用name的表名。理想情况下,函数将1.)删除已存在的表。2.)创建新表3.)创建映射器4.)使用映射器和pandas数据进行大容量插入。我被困在第三部分。

这是我的(无可否认的粗糙)代码。我正在努力使mapper函数与我的主键一起工作。我不需要主键,但mapper函数需要它。

谢谢你的洞察力。

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return

Tags: 数据函数namefromimportpandassqlalchemybind
3条回答

基于@ansonw答案:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

我在5秒内插入200000行而不是4分钟

我遇到了一个类似的问题,pd.to-sql花了几个小时上传数据。下面的代码在几秒钟内大量插入了相同的数据。

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()

当时可能已经有了答案,但是我通过整理这个站点上的不同答案并与SQLAlchemy的文档对齐找到了解决方案。

  1. 该表需要已经存在于db1中;并且索引设置为auto_increment on。
  2. 当前的类需要与CSV中导入的数据帧和db1中的表对齐。

希望这有助于任何人来这里,并想混合熊猫和快速炼金术。

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()

相关问题 更多 >