<p>我将用一个完整的例子来回答这个问题,因为在这个过程中我还有一些其他的问题</p>
<p>此示例能够:</p>
<ul>
<li>使用<strong>fast\u executemany</strong>和用户指定的内存友好<strong>分块</strong>将数据快速加载到MS SQL数据库</李>
<li>在大约0.3秒内将10000条记录(25列)加载到Microsoft SQL(MSSQL)数据库</李>
<li>在大约45秒内将1000000条记录(25列)加载到Microsoft SQL(MSSQL)数据库</李>
<li>在大约9分钟内将10000000条记录(25列)加载到Microsoft SQL(MSSQL)数据库</李>
<li>使用预配置的函数分块数据,该函数可避免使用chunksize,这会在较大的数据集上导致内存错误<a href="https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html" rel="nofollow noreferrer">Credit</a>用于分块</李>
<li>可以帮助您将数据加载到ram较低的SQL server中。大容量加载占用大量ram,较小的加载大小占用的内存要少得多</李>
<li>可以添加一个try/except语句来捕获您正在尝试记录的块的加载错误/稍后重试类型设置</李>
</ul>
<p>我已经为一些其他的DB提供者提供了一些未经测试的连接字符串。
截至2020年12月,pandas、sqlalchemy、pyodbc等的当前版本</p>
<pre><code>%%time #remove this if you are not using a Jupyter notebook and just want to run a .py script
import pandas as pd
import numpy as np
import sqlalchemy as sql
import sys
import math
# Enterprise DB to be used
DRIVER = "ODBC Driver 17 for SQL Server"
USERNAME = "TestUser"
PSSWD = "TestUser"
SERVERNAME = "DESKTOP-QLSOTTG"
INSTANCENAME = "\SQLEXPRESS"
DB = "TestDB"
TABLE = "perftest"
conn_executemany = sql.create_engine(
f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)
def chunker(seq, size):
return (seq[pos : pos + size] for pos in range(0, len(seq), size))
def insert_with_progress(df, engine, table="", schema=""):
con = engine.connect()
# Replace table
engine.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
# Insert with progress
SQL_SERVER_CHUNK_LIMIT = 100000
chunksize = math.floor(SQL_SERVER_CHUNK_LIMIT / len(df.columns))
for chunk in chunker(df, chunksize):
chunk.to_sql(
name=table,
con=con,
if_exists="append",
index=False
)
df = pd.DataFrame(np.random.random((10 ** 7, 24)))
df['TextCol'] = "Test Goes Here"
df.head()
print("DataFrame is", round(sys.getsizeof(df) / 1024 ** 2, 1), "MB")
print("DataFrame contains", len(df), "rows by", len(df.columns), "columns")
# Doing it like this errors out. Can't seem to be able to debug the straight pandas call.
# df.to_sql(TABLE, conn_sqlalchemy, index=False, if_exists='replace', method='multi', chunksize=2100)
insert_with_progress(df, conn_executemany, table=TABLE)
</code></pre>
<p><strong>关于连接字符串:</strong></p>
<ol>
<li>如果您希望更改为另一个DB类型,您很可能只需要更改以<code>f"mssql+pyodbc://</code>开头的行</li>
<li>如果SQL server不使用实例名称(例如SQLSERVERNAME\instance\u name),则可以将实例名称参数设置为空</李>
<li>如果确实使用实例名,请确保在变量开头保留\</李>
<li>如果使用不同的连接字符串,还需要在上面的代码窗口的最后一行用连接字符串名称替换变量名称</李>
</ol>
<p><strong>其他提供商的替代包含声明</strong><br/>
这些措施包括:</p>
<ul>
<li>pymssql</li>
<li>turbobdc</li>
</ul>
<pre><code>import pymssql as ms
import sqlalchemy as sql
import sqlalchemy_turbodbc as st
</code></pre>
<p><strong>备用连接字符串</strong><br/>
<a href="https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html" rel="nofollow noreferrer">Credit</a>用于DSN样式的字符串,我已将其修改为使用用户名/密码</p>
<pre><code>conn_sqlalchemy = sql.create_engine(f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")
conn_executemany = sql.create_engine(
f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)
conn_turbodbc = sql.create_engine(f"mssql+turbodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")
conn_pymssql = sql.create_engine(f"mssql+pymssql://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}")
</code></pre>