PySpark从DF列表对象值获取相关行

2024-09-28 15:36:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个dataframe,它有一个ID列和一个相关的数组列,其中包含其相关记录的ID

ID | NAME | RELATED_IDLIST
--------------------------
123 | mike | [345,456]
345 | alen | [789]
456 | sam  | [789,999]
789 | marc | [111]
555 | dan  | [333]

根据上面的内容,我需要在所有相关的子ID与其父ID之间建立一个关系

ID | NAME | RELATED_IDLIST
 --------------------------
 123 | mike | [345,456,789,999,111]
 345 | alen | [789,111]
 456 | sam  | [789,999,111]
 789 | marc | [111]
 555 | dan  | [333]
 

我需要你帮我弄清楚上面的内容


Tags: nameid内容dataframe关系sam记录数组
1条回答
网友
1楼 · 发布于 2024-09-28 15:36:46

通过使用Self joinsWindow functions可以解决这个问题。 我将代码分为5个步骤。算法如下:

  1. 分解数组列表以创建单个记录(数据中不再有数组)
  2. 自联接Id和相关(重命名为相关的_IDLIST列)列
  3. 将具有相同a_id的记录减少到一个数组中,将b_id减少到另一个数组中
  4. 将两个数组列表列合并到一个组合数组中,并根据每个组合数组的最大大小对结果记录进行排序
  5. 选择排名为1的记录

您可以尝试以下代码:

# importing necessary functions for later use
from pyspark.sql.functions import explode,col,collect_set,array_union,size
from pyspark.sql.functions import dense_rank,desc

# need set cross join to True if spark version < 3
spark.conf.set("spark.sql.crossJoin.enabled", True)

############### STEP 0 #####################################
# creating the above mentioned dataframe
id_cols = [123,345,456,789,555]
name_cols = ['mike','alen','sam','marc','dan']
related_idlist_cols = [[345,456],[789],[789,999],[111],[333]]

list_of_rows = [(each_0,each_1,each_2) for each_0, each_1, each_2 in zip(id_cols,name_cols,related_idlist_cols)]
cols_name = ['ID','NAME','RELATED_IDLIST']

# this will result in above mentioned dataframe
df = spark.createDataFrame(list_of_rows,cols_name)

############### STEP 1: Explode values  #####################################
# explode function converts arraylist to atomic records
# one record having array size two will result in two records 
#                                       + -> 123, mike,129
# 123, mike , explode(['129'.'9029'])  >
#                                        +-> 123, mike,9029
df_1 = df.select(col('id'),col('name'),explode(df.RELATED_IDLIST).alias('related'))

############### STEP 2 : Self Join with Data  #####################################
# creating dataframes with different column names, for joining them later
a = df_1.withColumnRenamed('id','a_id').withColumnRenamed('name','a_name').withColumnRenamed('related','a_related')
b = df_1.withColumnRenamed('id','b_id').withColumnRenamed('name','b_name').withColumnRenamed('related','b_related')

# this is an example outer join & self join 
df_2 = a.join(b, a.a_related == b.b_id, how='left').orderBy(a.a_id)

############### STEP 3 : create Array Lists #####################################
# using collect_set we can reduce values of a particular kind into one set (we are reducing 'related' records, based on 'id')
df_3  = df_2.select('a_id','a_name',collect_set('a_related').over(Window.partitionBy(df_2.a_id)).\
                    alias('a_related_ids'),collect_set('b_related').over(Window.partitionBy(df_2.b_id)).alias('b_related_ids'))

# merging two sets into one column and also calculating resultant the array size
df_4 = df_3.select('a_id','a_name',array_union('a_related_ids','b_related_ids').alias('combined_ids')).withColumn('size',size('combined_ids'))

# ranking the records to pick the ideal records 
df_5 = df_4.select('a_id','a_name','combined_ids',dense_rank().over(Window.partitionBy('a_id').orderBy(desc('size'))).alias('rank'))

############### STEP 4 : Selecting Ideal Records  #####################################
# picking records of rank 1, but this will have still ducplicates so removing them using distinct and ordering them by id
df_6 = df_5.select('a_id','a_name','combined_ids').filter(df_5.rank == 1).distinct().orderBy('a_id')

############### STEP 5 #####################################
display(df_6)
       

相关问题 更多 >