PythonPandasUps

2024-09-28 17:18:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有每天的数据管道,需要读取一个文件,并将数据写入postgres数据库。有些文件可能是新旧数据的混合体。你知道吗

我当前的过程是获取文件并将其写入Dataframe,将Dataframe转储到staging表,然后执行并向上插入。这是可行的,但我可能会更新完全相同的值。我会比较一行中的所有列,看看是否有一个新的值,这似乎比它的值更麻烦(有些表可能有30+列)。你知道吗

有没有更好或更优化的方法来做到这一点?我觉得我的表或源文件越大,这个过程需要的时间就越多。下面是我的过程的一些示例代码:

def example_import(file_path, engine):
    '''
    ETL example
    '''

    df_demo = download_from_ftp(file_path)

    if df_demo.empty:
        return

    # some cleaning might happen here

    df_demo.to_sql(con = engine, name = 'tbl', schema = 'staging', if_exists = 'replace', index = False, method = 'multi')

    with open('/somefolder/upsert.sql') as fp:
        upsert = fp.read()

    result = engine.execute(upsert)
    print('Inserted/Updated {} Records From {}'.format(result.rowcount, file_path))

示例数据:

当前表(实际上有更多的列)

Current Table

传入数据(新旧混合)。一些旧的PK有新的值)

enter image description here

追加插入后的数据

enter image description here

向上插入示例

INSERT INTO public."tbl" ("colA",
                                 "colB",
                                 "colC",
                                 "colD",
                                 "colE",
                                 "colF",
                                 "colG",
                                 "colH",
                                 "colI",
                                 "colJ",
                                 "colK",
                                 "colL",
                                 "colM",
                                 "colN",
                                 "colO",
                                 "colP",
                                 "colQ",
                                 "colR",
                                 "colS",
                                 "colT",
                                 "colU",
                                 "colV"
SELECT "colA",
         "colB",
         "colC",
         "colD",
         "colE"::DATE,
         "colF",
         "colG"::DATE,
         "colH",
         "colI"::DATE,
         "colJ"::DATE,
         "colK",
         "colL",
         "colM",
         "colN",
         "colO",
         "colP",
         "colQ",
         "colR",
         "colS",
         "colT",
         "colU",
         "colV"
FROM staging."tbl"
ON CONFLICT ("colA") DO UPDATE 
    SET "colB" = excluded."colB",
         "colC" = excluded."colC",
         "colD" = excluded."colD",
         "colE" = excluded."colE",
         "colF" = excluded."colF",
         "colG" = excluded."colG",
         "colH" = excluded."colH",
         "colI" = excluded."colI",
         "colJ" = excluded."colJ",
         "colK" = excluded."colK",
         "colL" = excluded."colL",
         "colM" = excluded."colM",
         "colN" = excluded."colN",
         "colO" = excluded."colO",
         "colP" = excluded."colP",
         "colQ" = excluded."colQ",
         "colR" = excluded."colR",
         "colS" = excluded."colS",
         "colT" = excluded."colT",
         "colU" = excluded."colU",
         "colV" = excluded."colV"

Tags: 数据coldcollexcludedcolecolccolbcolf