我们正在用Python开发一个数据摄取工具,在读取(多个)AS400表时遇到了速度缓慢的问题
这是该工具的核心部分(特定于读取AS400表格)。正如你所看到的,它做的不多;它只是从AS400数据库读取表,向其中添加一个时间戳列,然后将其写入另一个目标数据库,在本例中,该数据库位于MS SQL Server中
def process(record):
engine_source = new_as400_connection()
engine_destination = new_db_connection()
ROWS = 100_000
df_gen = pd.read_sql(record.query, engine_source, chunksize=ROWS)
for df in df_gen:
df['_ETLDate'] = pd.to_datetime('today').replace(microsecond=0)
df.to_sql(
record.target,
engine_destination,
if_exists='append',
index=False,
)
engine_source.dispose()
engine_destination.dispose()
下面是我们如何使用内置的ThreadPoolExecutor
调用此函数:
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(16) as executor:
__ = executor.map(process, records)
其中records
是要处理的表的列表,如下所示:
┌───────────┬───────────┬────────────────────────────────────────┐
│ source_id │ target │ query │
├───────────┼───────────┼────────────────────────────────────────┤
│ TABLE001 │ target_01 │ SELECT * FROM TABLE001 │
│ TABLE002 │ target_02 │ SELECT * FROM TABLE002 │
│ TABLE003 │ target_03 │ SELECT * FROM TABLE003 WHERE ABC > 100 │
└───────────┴───────────┴────────────────────────────────────────┘
显然,其想法是并行地将数据从AS400复制到SQL Server。但是,这种方法比使用SSIS包执行的等效作业要慢。区别非常显著:此Python代码比SSIS包花费的时间多3-4倍(即使后者是按顺序执行的)
我们尝试过的事情:
pyodbc
的cursor.fetchmany
。它的性能几乎与此相同。因此,我们认为这种急剧的缓慢不是由于使用pandas.read_sql
李>一位同事建议在子shell中的单独进程中调用每个表的复制进程。因此,我们从多线程方法切换到这个子进程方法:
if __name__ == '__main__':
args = parser.parse_args()
if args.single_record:
process(json.loads(record))
else:
cmd = './venv/Scripts/python -m sourcetostaging --single_record'
for record in records:
subprocess.Popen(
[*cmd.split(), json.dumps(record)],
cwd='./',
stdout=subprocess.DEVNULL,
)
它运行良好:我们看到了大约60%-70%的改善。但是,此代码仍然比SSIS包慢:此修改的方法花费的时间是SSIS包的两倍(即使后者是按顺序执行的)
我们理解,因为SSIS使用SQL,所以它总是比同等的Python代码快。然而,再多花2倍的时间似乎表明我在Python代码中做了一些错误的事情
以下是我们正在使用的连接字符串:
def new_as400_connection():
cs = 'ibm_db_sa+pyodbc:///?odbc_connect=' + quote(
'DRIVER={iSeries Access ODBC Driver};'
+ 'SYSTEM={system};UID={uid};PWD={pwd}'.format(**config_as400)
)
return sqlalchemy.create_engine(cs)
def new_db_connection():
cs = 'mssql+pyodbc://{user}:{pwd}@{host}/{database}?driver={driver}'
return sqlalchemy.create_engine(
cs.format(**config_db),
fast_executemany=True,
)
还有其他我们忽略的方法吗?我们认为多线程方法非常常见
注释
for df in df_gen:
(或者cursor.fetchmany
在另一个包中)是最耗时的李>
我猜性能问题与ODBC有关
我将编写一个web服务,该服务按键返回表行的子集。下面是PHP代码,它通过客户代码从产品主控中读取行。您可以用python或node编写类似的web服务
web服务的一个优点是,您可以专门为正在阅读的表编写代码。如果读取整个库存文件,您可以从项目编号开始,然后
fetch first 50000 rows only
。当读取下一批时,客户机可以告诉服务器web服务从何处开始读取相关问题 更多 >
编程相关推荐