from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
# Important: asNondeterministic requires Spark 2.3 or later
# It can be safely removed i.e.
# return udf(to_array_, ArrayType(DoubleType()))(col)
# but at the cost of decreased performance
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
(df
.withColumn("xs", to_array(col("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
def splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns
for col in new_features: # new_features should be the same length as vector column length
schema = schema.add(col,DoubleType(),True)
return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)
一种可能的方法是转换到RDD和从RDD转换:
另一种解决方案是创建自定义项:
关于Scala等价物,请参见Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]。
函数将特征向量列转换为单独的列
使用how-to-access-element-of-a-vectorudt-column-in-a-spark-dataframe中的自定义项要快得多
上面zero323在解决方案中给出的extract函数使用to list,它创建一个Python列表对象,用Python float对象填充它,通过遍历列表来找到所需的元素,然后需要将列表转换回java double;对每一行重复。使用rdd要比tou数组udf慢得多,后者也调用toList,但两者都比让SparkSQL处理大部分工作的udf慢得多。
比较rdd extract和此处提议的数组udf与来自3955864的udf的计时代码:
结果:
相关问题 更多 >
编程相关推荐