如何使用并发性在Python中迁移数据库?

2024-05-19 20:27:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个用于将数据从SQLite迁移到Postgres的脚本。我只是使用for循环一个接一个地传输表。现在,我想尝试使用线程、多处理或异步IO并发传输多个表,以加速程序,比较这些方法之间的运行时。 你是怎么做到的

这是我的剧本:

import psycopg2, sqlite3, sys
import time
import multiprocessing



sqdb="C://Users//duongnb//Desktop//Python//SqliteToPostgreFull//testmydb6.db"
sqlike="table"
pgdb="testmydb11"
pguser="postgres"
pgpswd="1234"
pghost="127.0.0.1"
pgport="5432"


consq=sqlite3.connect(sqdb)
cursq=consq.cursor()

tabnames=[]
print() 
cursq.execute('SELECT name FROM sqlite_master WHERE type="table" AND name LIKE "%table%";')
tabgrab = cursq.fetchall()
for item in tabgrab:
    tabnames.append(item[0])
print(tabgrab)

def copyTable(table):
        print(table)
        cursq.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = ?;", (table,))
        create = cursq.fetchone()[0]
        cursq.execute("SELECT * FROM %s;" %table)
        rows=cursq.fetchall()
        colcount=len(rows[0])
        pholder='%s,'*colcount
        newholder=pholder[:-1]

        try:

            conpg = psycopg2.connect(database=pgdb, user=pguser, password=pgpswd,
                                host=pghost, port=pgport)
            curpg = conpg.cursor()
            curpg.execute("DROP TABLE IF EXISTS %s;" %table)
            create = create.replace("AUTOINCREMENT", "")
            curpg.execute(create)
            curpg.executemany("INSERT INTO %s VALUES (%s);" % (table, newholder),rows)
            conpg.commit()

            if conpg:
                conpg.close()

        except psycopg2.DatabaseError as e:
            print ('Error %s' % e) 
            sys.exit(1)

        finally:
            print("Complete")    

consq.close()

if __name__ == "__main__":
    start_time = time.time()
    for table in tabnames:
        p = multiprocessing.Process(target = copyTable, args = (table))
        p.start()
    for table in tabnames:
        p.join()
    print("All processes finished.")      

    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Tags: nameimportforexecutetimecreatetableselect
1条回答
网友
1楼 · 发布于 2024-05-19 20:27:29

您应该将for table in tabnames的内部函数放入函数中,例如copyTable。然后您就可以使用multiprocessing包来并行化代码。它应该是这样的:

for table in tabnames:
    p = multiprocessing.Process(target = copyTable, args = (table))
    p.start()
for table in tabnames:
    p.join()
print("All processes finished.")

但是,如果使用COPYhttps://www.postgresql.org/docs/current/sql-copy.html)而不是许多INSERT命令,则可以进一步提高代码的速度

除了multiprocessing模块之外,您还可以使用threading模块,它的工作原理非常类似。然后你有线程而不是进程。由于解释器锁的存在,我希望它的性能会更差

相关问题 更多 >