PySp中高效的列处理

2024-06-30 15:50:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含大量列(大于30000列)的数据帧。在

我用10填充它,基于第一列,如下所示:

for column in list_of_column_names:
  df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))

但是这个过程需要很多时间。有没有一种方法可以更有效地做到这一点?有些东西告诉我列处理可以并行化。在

编辑:

样本输入数据

^{pr2}$

Tags: of数据indffornames过程时间
3条回答

withColumn已经发布了,因此除了已有的之外,很难找到一种更快的方法。您可以尝试定义一个udf函数,如下所示

from pyspark.sql import functions as f
from pyspark.sql import types as t

def containsUdf(listColumn):
    row = {}
    for column in list_of_column_names:
        if(column in listColumn):
            row.update({column: 1})
        else:
            row.update({column: 0})
    return row

callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))

df.withColumn('struct', callContainsUdf(df['list_column']))\
    .select(f.col('list_column'), f.col('struct.*'))\
    .show(truncate=False)

它应该给你

^{pr2}$

注意:list_of_column_names = ["Foo","Bar","Baz"]

您的代码没有什么特别的错误,除了非常广泛的数据:

for column in list_of_column_names:
    df = df.withColumn(...)

只生成执行计划。在

一旦结果被评估,实际的数据处理将并行化。在

然而,这是一个昂贵的过程,因为它需要O(NMK)操作,列表中有N行、M列和K值。在

此外,在非常宽的数据上执行计划的计算成本非常高(尽管在记录数量方面成本是不变的)。如果它成为一个限制因素,最好使用RDDs

  • {column}使用
  • 将数据转换为RDD。在
  • 使用二进制搜索对每个列应用搜索。在

你可以这样接近

import pyspark.sql.functions as F

exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)\
                  for column in list_column_names]

df = df.select(['list_column']+exprs)

相关问题 更多 >