Spark如何基于模棱两可的名称获取所有相关列

2024-10-01 05:04:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下几个数据帧。目标是使用product_name作为键来查找所有相关信息。问题是,有时它被称为prod_name或其他类似的名称。另外,例如,如果product_name链接到ser_no,则该数据帧中的信息也属于该产品。下面的例子,让我知道我是否可以更好地解释这一点

有人能帮我解释一下吗?我正在尝试在不显式地将它们连接在一起的情况下自动化这个过程,因为有许多这样的表,我不知道所有确切的表/列名称。基本上,我试图从这些配置单元表中提取与aproduct_name相关的所有信息。在Spark graphX api中感觉像connectedComponent,但不完全一样?非常感谢你的帮助

df1:

prod_name    value 
    A      code1
    B      code2
    C      code3, code 33
    D       code4

df2:

product_name.    jvalue 
  A            indi3
  B            indi4
  C            indi5
  D            indi6

df3:

   product_name.    ser_no 
     A               100
     B               200
     C               300
     D               400

df4:

  colour.         ser_no 
  Amber           100
  Blue            200
  Orange          300
  Green           400

预期产出:

product_no    value          jvalue     ser_no   colour 
    A        code1.           indi3.     100.     Amber
    B        code2            indi4.     200      Blue
    C        code3, code 33.  indi5      300      Orange
    D        code4            indi6      400      Green 

Tags: 数据noname名称信息valuecodeprod
1条回答
网友
1楼 · 发布于 2024-10-01 05:04:49

你的问题似乎有三个部分

  1. 发现具有相同逻辑列的所有表,即使确切的列名不同

  2. 发现一组数据帧中的所有foreign keys

  3. 基于适当的联接键将数据帧集合联接在一起,即使联接键位于每个数据帧的不同列名下

子问题1

首先,我们需要一个方法来检查列名是否与逻辑列匹配。我们将使用它作为过滤器。 正则表达式是一种合理的方法,但在本例中,我将简单地列举一组应被视为相同的列名

val productNameColumns = Seq("prod_name", "product_name", "product")

既然您提到了Hive,我假设我们可以使用Spark目录来发现表。我们可以使用如下代码获取所有表中与逻辑列匹配的所有列

import org.apache.spark.sql.functions.{col, lit}

val allTables = spark.catalog.listTables().select("name").as[String].collect

val tableColumns = allTables
    .map { tableName => 
        // If you have many tables, this will start many spark jobs. This may be too slow to be feasible.
        spark.catalog
            .listColumns(tableName)
            .where(col("name").isin(productNameColumns: _*))
            // or `.where(col("name").rlike(some-regex))` if using a regex
            .select(
                lit(tableName).as("table"),
                col("name").as("column")
            )
            // Make sure that we only take 1 column per table. This may not be needed depending on your data.
            .groupBy("table")
            .agg(
                first(col("column")).as("column")
            )
    }
    .reduce(_ union _)
    .as[(String, String)]
    .collect
    .toMap

以上代码假定您只关心当前数据库。如果需要,可以迭代所有数据库

{}图是子问题1的答案。键是表名,值是与“产品名”对应的列名

子问题2

子问题2是外键发现的经典示例。这本身就是一个完整的研究领域I suggestdo some reading 在沿着这条路走之前。这看起来很简单,但实际上很难。当我们 开始讨论由多列组成的键

让我们把这一特性放在一边,继续讨论子问题3

子问题3

子问题3最简单的解决方案是标准化您将加入的列名。这很容易,因为我们已经知道这个专栏了 解决子问题1后每个数据帧的名称。我们可以在映射上迭代并重命名要加入的列,然后减少加入

我假设了一个"full_outer"连接,因为这样我们就不会丢失任何记录的信息。当然,无论您选择哪种联接类型 如果联接键不是所有数据帧的主键(跨行唯一),则可能会出现行爆炸。这很可能会发生 不管怎样,都是一个昂贵的数据帧

tableColumns
    .map { case (tableName, columnName) =>
        spark.table(tableName).withColumnRenamed(columnName, "__join_column")
    }
    .reduce { case (accDf, nextDf) =>
        accDf.join(nextDf, Seq("__join_column"), "full_outer")
    }

相关问题 更多 >