从NetCDF fi加载PostgreSQL数据库

2024-05-19 15:39:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个netCDF文件,有八个变量。(抱歉,无法共享实际文件) 每个变量都有两个维度,时间和地点。时间大约是14步,目前该站有38000个不同的ID。 所以对于38000个不同的“位置”(实际上只是一个id),我们有8个变量和14个不同的时间。在

$ncdump -h stationdata.nc
netcdf stationdata {
dimensions:
    station = 38000 ;
    name_strlen = 40 ;
    time = UNLIMITED ; // (14 currently)
variables:
    int time(time) ;
            time:long_name = "time" ;
            time:units = "seconds since 1970-01-01" ;
    char station_name(station, name_strlen) ;
            station_name:long_name = "station_name" ;
            station_name:cf_role = "timeseries_id" ;
    float var1(time, station) ;
            var1:long_name = "Variable 1" ;
            var1:units = "m3/s" ;
    float var2(time, station) ;
            var2:long_name = "Variable 2" ;
            var2:units = "m3/s" ;
...

这些数据需要加载到PostGres数据库中,以便将数据连接到与站点名称匹配的一些几何图形中,以便以后可视化。在

目前我已经在Python中用netCDF4模块完成了这项工作。有效,但需要永远! 现在我这样循环:

^{pr2}$

这需要我的机器运行几分钟,我觉得可以用更聪明的方法来完成。在

有人知道如何用更聪明的方法来做这件事吗?最好是Python。在


Tags: 文件nameidtime时间floatvariablelong
3条回答

组织循环以访问每次的所有变量。换句话说,一次读写一个记录,而不是一次读写一个变量。这可以极大地加快速度,尤其是如果源netCDF数据集存储在具有大磁盘块的文件系统中,例如1MB或更大。关于为什么这是更快的解释和数量级加速的讨论,请参见this NCO speedup discussion,从条目7开始。在

我不确定这是正确的方法,但我找到了一个很好的方法来解决这个问题,我想我应该分享它。在

在第一个版本中,脚本运行大约需要一个小时。重写代码后,它现在运行不到30秒!在

最重要的是使用numpy数组并将NetCDF阅读器中的变量数组转换为行,然后将所有列堆叠成一个矩阵。然后使用psycopg2 copy_from函数将该矩阵加载到数据库中。我从这个问题得到了密码

Use binary COPY table FROM with psycopg2

我的部分代码:

dates = num2date(rootgrp.variables['time'][:],units=rootgrp.variables['time'].units)
var1=rootgrp.variables['var1']
var2=rootgrp.variables['var2']

cpy = cStringIO.StringIO()

for timeindex, time in enumerate(dates):

    validtimes=np.empty(var1[timeindex].size, dtype="object")
    validtimes.fill(time)

    #  Transponse and stack the arrays of parameters
    #    [a,a,a,a]        [[a,b,c],
    #    [b,b,b,b]  =>     [a,b,c],
    #    [c,c,c,c]         [a,b,c],
    #                      [a,b,c]]

    a = np.hstack((
              validtimes.reshape(validtimes.size,1),
              stationnames.reshape(stationnames.size,1),
              var1[timeindex].reshape(var1[timeindex].size,1),
              var2[timeindex].reshape(var2[timeindex].size,1)
    ))

    # Fill the cStringIO with text representation of the created array
    for row in a:
            cpy.write(row[0].strftime("%Y-%m-%d %H:%M")+'\t'+ row[1] +'\t' + '\t'.join([str(x) for x in row[2:]]) + '\n')


conn = psycopg2.connect("host=postgresserver dbname=nc user=user password=passwd")
curs = conn.cursor()

cpy.seek(0)
curs.copy_from(cpy, 'ncdata', columns=('validtime', 'stationname', 'var1', 'var2'))
conn.commit()

你可以做一些简单的改进来加速这个过程。所有这些都是独立的,你可以尝试所有的或只是几个,看看是否足够快。它们大致按难度的升序排列:

  • 使用psycopg2数据库驱动程序,速度更快
  • 在事务中包装整个插入块。如果您使用的是psycopg2,那么您已经在做了—它会自动打开一个事务,您必须在最后commit。在
  • 在一个数组中收集几行值,每n行执行一次多值插入。在
  • 使用多个连接通过helper进程执行插入-请参阅multiprocessing模块。由于GIL(全局解释器锁)问题,线程无法正常工作。在

如果不想使用一个大事务,可以设置synchronous_commit = off并设置commit_delay,这样连接可以在磁盘刷新实际完成之前返回。如果你在一个事务中完成所有的工作,这对你没有多大帮助。在

多值插入

Psycopg2不直接支持多值INSERT,但您可以写下:

curs.execute("""
INSERT INTO blah(a,b) VALUES
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s);
""", parms);

然后用类似的东西循环:

^{pr2}$

相关问题 更多 >