使用(Py)Spark从Postgres并行读取和处理

2024-09-30 12:23:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个关于从Postgres数据库中读取大量数据并使用spark并行处理的问题。假设我在Postgres中有一个表,我想使用JDBC读入Spark。假设它有以下列:

  • id(bigint)
  • 日期(日期时间)
  • 许多其他列(不同类型)

当前Postgres表未分区。我希望并行转换大量数据,并最终将转换后的数据存储到其他地方

问题:我们如何优化博士后数据的并行读取

文档(https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)建议使用partitionColum并行处理查询。此外,需要设置lowerBoundupperBound。据我所知,在我的例子中,我可以使用列iddate表示partitionColumn。但是,这里的问题是如何在对其中一列进行分区时设置lowerBoundupperBound值。我注意到,如果设置不正确,在我的例子中会出现数据倾斜。对于Spark中的处理,我不关心自然分区。我只需要尽可能快地转换所有数据,所以我认为最好是针对非倾斜分区进行优化

我已经想出了一个解决办法,但我不确定这样做是否真的有意义。本质上,它是将id散列到分区中。我的解决方案是在id列上使用mod()和指定数量的分区。因此,中的dbtable字段类似于:

"(SELECT *, mod(id, <<num-parallel-queries>>) as part FROM <<schema>>.<<table>>) as t"

然后我使用partitionColum="part"lowerBound=0upperBound=<<num-parallel-queries>>作为Spark read JDBC作业的选项

请让我知道这是否有意义


Tags: 数据idmodparallelpostgresnum例子spark
1条回答
网友
1楼 · 发布于 2024-09-30 12:23:52

按主键列“划分”是个好主意

要获得大小相同的分区,请使用表统计信息:

SELECT histogram_bounds::text::bigint[]
FROM pg_stats
WHERE tablename = 'mytable'
  AND attname = 'id';

如果default_statistics_target的默认值为100,则这将是一个101个值的数组,将百分位从0到100进行分隔。您可以使用它来均匀地划分表

例如:如果数组看起来像{42,10001,23066,35723,49756,...,999960},并且需要50个分区,那么第一个分区将是所有带有id<;23066,第二行全部为23066≤ id<;49756,等等

相关问题 更多 >

    热门问题