擅长:python、mysql、java
<p>下面是在pyspark中转置数据帧(RDD)的代码。你知道吗</p>
<pre><code>import numpy as np
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
dt1 = {'avg_length':[40.0, 9.0, 5.284, 5.047, 6.405, 13.0]}
dt = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in dt1.items()]).toDF()
dt.show()
# - Transpose Code -
# Grad data from first columns, since it will be transposed to new column headers
new_header = [i[0] for i in dt.select("_1").rdd.map(tuple).collect()]
# Remove first column from dataframe
dt2 = dt.select([c for c in dt.columns if c not in ['_1']])
# Convert DataFrame to RDD
rdd = dt2.rdd.map(tuple)
# Transpose Data
rddT1 = rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
rddT2 = rddT1.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
rddT3 = rddT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
rddT4 = rddT3.map(lambda x: map(lambda (i, y): y , x))
# Convert back to DataFrame (along with header)
df = rddT4.toDF(new_header)
df.show()
</code></pre>
<p>在转置之后,您可以简单地合并两个数据帧。
我希望这有帮助。你知道吗</p>