如何将numpy数组元素转换为spark RDD列值

2024-10-02 14:25:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我准备使用spark数据帧(而不是pandas)的内置CSV打印工具。我已经建立了一个索引矩阵。因此,其中有稀疏数组列。到密集阵列的转换由x执行。矢量.toArray()地图调用(下图)。在密集的数组中,单个的数据元素似乎无法从数组中释放出来。(请不要介绍pandas)如何将这个RDD转换成7列dataframe,它由一个string列和六个integer列组成?目前我的代码:

X = CoordinateMatrix(sc.parallelize(entries)) 
Xirm = X.toIndexedRowMatrix()
colnames = "username," + ','.join(str(cell) for cell in itemids.keys())  # Make CSV header line
# Might need this for speed: Arrow:  https://bryancutler.github.io/createDataFrame/  See above conf=...
XX = Xirm.rows.map(lambda x: (lu[x.index], x.vector.toArray())) # ?
print(XX.take(2))
df = XX.toDF() #TypeError: Can not infer schema for type: <class 'numpy.ndarray'>
#df.write.csv(header=colnames, path=out_filename)

以下是take(2)查看数据示例:

^{pr2}$

问题是RDD元组有2列,但我需要一个DataFrame中有7列。列数是动态确定的,colnames变量中有列名,但我不知道如何将其输入其中。同样,我们的目标是通过使用spark内置的DAtaFrame的CSV写入功能输出一个CSV文件“等价的”(许多部分文件是可以的)。(spark2.3.0是常驻的)reals将被理想地转换为int,并且没有任何数据值的引号。但是2到7列的转换是目前真正困难的问题。谢谢你的提示。在


Tags: csv数据pandasforcell数组内置spark