在多行中分解具有密集向量的列

2024-10-04 07:24:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含两列的数据帧:BrandWatchErwaehnungIDword_countsword_counts列是`CountVectorizer(稀疏向量)的输出。删除空行之后,我创建了两个新列,一个是稀疏向量的索引,另一个是它们的值。在

help0 = countedwords_text['BrandWatchErwaehnungID','word_counts'].rdd\
    .filter(lambda x : x[1].indices.size!=0)\
    .map(lambda x : (x[0],x[1],DenseVector(x[1].indices) , DenseVector(x[1].values))).toDF()\
    .withColumnRenamed("_1", "BrandWatchErwaenungID").withColumnRenamed("_2", "word_counts")\
    .withColumnRenamed("_3", "word_indices").withColumnRenamed("_4", "single_word_counts")

由于spark不接受numpy.ndarray,所以在添加到数据帧之前,我需要将它们转换为密集向量。我的问题是我现在想分解word_indices列上的dataframe,但是来自pyspark.sql.functionsexplode方法只支持数组或映射作为输入。在

我试过:

^{pr2}$

得到以下错误:

cannot resolve 'explode(`word_indices')' due to data type mismatch: input to function explode should be array or map type

后来我试着:

help1 = help0.withColumn('b' , explode(help0.word_indices.toArray()))

也没用。。。 有什么建议吗?在


Tags: to数据lambdamaptype向量wordcounts
1条回答
网友
1楼 · 发布于 2024-10-04 07:24:42

您必须使用udf

from pyspark.sql.functions import udf, explode
from pyspark.sql.types import *
from pyspark.ml.linalg import *

@udf("array<integer>")
def indices(v):
   if isinstance(v, DenseVector):
      return list(range(len(v)))
   if isinstance(v, SparseVector):
      return v.indices.tolist()

df = spark.createDataFrame([
   (1, DenseVector([1, 2, 3])), (2, SparseVector(5, {4: 42}))], 
   ("id", "v"))

df.select("id", explode(indices("v"))).show()

# + -+ -+
# | id|col|
# + -+ -+
# |  1|  0|
# |  1|  1|
# |  1|  2|
# |  2|  4|
# + -+ -+

相关问题 更多 >