我用以下代码计算数据帧所有行之间的余弦相似性:
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
normalizer = Normalizer(inputCol="features", outputCol="norm")
data = normalizer.transform(transformed_df)
data = index_df(data)
mat = IndexedRowMatrix(
data.select("id", "norm")\
.rdd.map(lambda row: IndexedRow(row.id, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
indexed_dot = dot.toIndexedRowMatrix()
indexed_rdd = indexed_dot.rows
df = indexed_rdd.toDF()
当我使用数据帧的一个子集(100k行)时,它是有效的,但是当我尝试使用更多行(我的目标是300k行)时,我得到一个错误下面的。在
^{pr2}$任务似乎被困在某个特定级别并失败了好几次,所以经理就终止了它。在
你知道我怎么解决这个问题吗?在
在使用
yarn logs -applicationId <applicationId> -containerId <containerId>
调查日志后,问题似乎来自于一个不断失败的任务。Spark实现了容错,任务被重复,导致我的工作人员的磁盘空间不足(超过90%)。节点变得不正常,作业最终失败。在不过,这项任务为什么失败仍然是个谜。如果我知道那里发生了什么,我会更新的。在
相关问题 更多 >
编程相关推荐