我想用一个pyspark accumulator将从rdd
推断的值相加填充一个矩阵;我发现文档有点不清楚。添加一点背景,以防万一。
MyrddData
包含索引列表,其中一个计数必须添加到矩阵中。例如,此列表映射到索引:[1,3,4] -> (11), (13), (14), (33), (34), (44)
现在,这是我的累加器:
from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
def zero(self, mInitial):
import numpy as np
aaZeros = np.zeros(mInitial.shape)
return aaZeros
def addInPlace(self, mAdd, lIndex):
mAdd[lIndex[0], lIndex[1]] += 1
return mAdd
这是我的映射函数:
^{pr2}$然后运行数据:
oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())
rddData.map(populate_sparse).collect()
现在,当我看我的数据时:
sum(sum(oAccumilatorMatrix.value))
#= 0.0
不应该是这样的。我错过了什么?在
编辑 一开始尝试用稀疏矩阵,得到了不支持稀疏矩阵的回溯。密集矩阵问题:
...
raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.
啊哈!我想我明白了。在一天结束的时候,这个累加器仍然需要添加自己的部分。因此,将
addInPlace
更改为:所以现在它在给定一个列表时添加索引,并在
populate_sparse
函数循环之后添加自己,以创建我的最终矩阵。在相关问题 更多 >
编程相关推荐