遍历Spark RDD

2024-05-20 08:21:25 发布

您现在位置:Python中文网/ 问答频道 /正文

从Spark数据帧开始,创建矢量矩阵以进行进一步的分析处理。

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()

输出是一个向量数组。其中一些向量有一个空值

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])

在此基础上,我想遍历向量矩阵,如果向量包含空值,则创建一个0(零)的LabeledPoint数组,否则为1。

def f(row):
    if row.contain(None):
       LabeledPoint(1.0,row)
    else:
       LabeledPoint(0.0,row)

我试图用

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) #   create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol

但这不管用。

TypeError: 'PipelinedRDD' object is not iterable

任何帮助都很好


Tags: 数据矢量矩阵数组向量matrixfeaturespark
1条回答
网友
1楼 · 发布于 2024-05-20 08:21:25

RDDs不是Python列表的替换项。必须使用给定RDD上可用的操作或转换。在这里,您只需使用map

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint


feature_matrix_vectors = sc.parallelize([
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])

(feature_matrix_vectors
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
    .collect())

相关问题 更多 >