from pyspark.sql import functions as f
from pyspark.sql import types as t
def containsUdf(listColumn):
row = {}
for column in list_of_column_names:
if(column in listColumn):
row.update({column: 1})
else:
row.update({column: 0})
return row
callContainsUdf = f.udf(containsUdf, t.StructType([t.StructField(x, t.StringType(), True) for x in list_of_column_names]))
df.withColumn('struct', callContainsUdf(df['list_column']))\
.select(f.col('list_column'), f.col('struct.*'))\
.show(truncate=False)
import pyspark.sql.functions as F
exprs = [F.when(F.array_contains(F.col('list_column'), column), 1).otherwise(0).alias(column)\
for column in list_column_names]
df = df.select(['list_column']+exprs)
withColumn
已经发布了,因此除了已有的之外,很难找到一种更快的方法。您可以尝试定义一个udf
函数,如下所示它应该给你
^{pr2}$注意:
list_of_column_names = ["Foo","Bar","Baz"]
您的代码没有什么特别的错误,除了非常广泛的数据:
只生成执行计划。在
一旦结果被评估,实际的数据处理将并行化。在
然而,这是一个昂贵的过程,因为它需要O(NMK)操作,列表中有N行、M列和K值。在
此外,在非常宽的数据上执行计划的计算成本非常高(尽管在记录数量方面成本是不变的)。如果它成为一个限制因素,最好使用
RDDs
:RDD
。在你可以这样接近
相关问题 更多 >
编程相关推荐