我有一个MS SQL Server数据库,有1200个大小不同的表。数据总量约为2TB,最大的表约为400GB。每三个月一次,我需要以自动化的方式将这些数据输入BigQuery。数据不必以任何方式进行转换,但该过程不能是手动的,而且我无法访问Windows服务器本身
目前,这是通过Dataproc(Spark,Scala)集群完成的,该集群具有80个CPU和1,4TB RAM。这对我来说是非常浪费的,因为这是一个简单的数据传输
相反,我想做的是使用Python检索数据并将其写入Avro文件,然后我可以非常快速地将其加载到BigQuery中。我的问题是,在单个表上使用pyodbc进行的初始测试表明,我在最初几秒钟(60位)内获得了很高的速度,然后速度下降到6位左右。如果我并行地对3个表(不同的连接)执行此操作,它是相同的模式,但会下降到3x6=18MBit
奖金信息:这发生在GCP和AWS之间的公司VPN隧道上
因此,我的问题是:
以下是我用于测试的代码:
import csv
import sys
import tracemalloc
import pyodbc
from datetime import datetime
tracemalloc.start()
test_table = sys.argv[1]
fetch_rows = 1000 # Avg 6MBit
output_file = f'{test_table}.csv'
delimiter = ';'
quote_char = '"'
print('Connecting')
connection = pyodbc.connect('Driver={ODBC Driver 17 for SQL Server};'
'Server=X;'
'Database=Y;'
'uid=Z;'
'pwd=W')
print('Connected')
cursor = connection.cursor()
print('Selecting')
cursor.execute(f"SELECT * FROM {test_table}")
print('Executed')
with open(output_file, 'w+', newline='', encoding='utf-8') as f:
writer = csv.writer(f, delimiter=delimiter, quotechar=quote_char, quoting=csv.QUOTE_NONNUMERIC)
writer.writerow([x[0] for x in cursor.description]) # column headers
rows = cursor.fetchmany(fetch_rows)
rows_written = 0
while len(rows) > 0:
for row in rows:
writer.writerow(row)
rows_written = rows_written + 1
current, peak = tracemalloc.get_traced_memory()
print(f"{datetime.now()}: {rows_written} rows written. Current memory usage is {current / 10 ** 6}MB;"
f" Peak was {peak / 10 ** 6}MB")
rows = cursor.fetchmany(fetch_rows)
cursor.close()
tracemalloc.stop()
目前没有回答
相关问题 更多 >
编程相关推荐