无法在sp中连续执行2个groupBy

2024-10-01 02:32:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在和Spark一起研究python。在

我的问题是:我有一个.csv文件,其中包含一些数据(int1,int2,int3,date)。我在int1上做了groupByKey。现在我想用第一个groupBy在我的日期执行另一个groupBy创建的rdd。在

问题是我做不到。有什么想法吗?在

问候

编辑2: 从pyspark导入SparkContext 导入csv 导入系统 导入StringIO

sc = SparkContext("local", "Simple App")
file = sc.textFile("histories_2week9.csv")

 csvById12Rdd=file.map(lambda (id1,id2,value): ((id1,id2),value)).groupByKey()
 csvById1Rdd=csvById12Rdd.map(lambda ((id1,id2),group):(id1, (id2,group))).groupByKey()



def printit(one):
  id1, twos=one
  print("Id1:{}".format(id1))
    for two in twos:
      id2, values=two
      print("Id1:{} Id2:{}".format(id1,id2))
     for value in values:
        print("Id1:{} Id2:{} Value:{}".format(id1,id2,value))


  csvById12Rdd.first().foreach(printit)

csv就像 31705,48,22014-10-28218:14:09.000Z

编辑3:

我可以用这个代码打印迭代器数据

^{pr2}$

但我还是不能分组


Tags: csv数据format编辑valuescprintid2
1条回答
网友
1楼 · 发布于 2024-10-01 02:32:05

groupby返回一个RDD(Key,Iterable[Value]),可以反过来吗?在

  1. 按id1id2分组,得到RDD((id1,id2),Iterable[Value])
  2. 然后按id1单独分组得到RDD(id1,Iterable[(Id2,Iterable[Value]))

比如:

csv=[(1,1,"One","Un"),(1,2,"Two","Deux"),(2,1,"Three","Trois"),(2,1,"Four","Quatre")]
csvRdd=sc.parallelize(csv)
# Step 1
csvById12Rdd=csvRdd.map(lambda (id1,id2,value1,value2): ((id1,id2),(value1,value2))).groupByKey()
# Step 2
csvById1Rdd=csvById12Rdd.map(lambda ((id1,id2),group):(id1, (id2,group))).groupByKey()
# Print    
def printit(one):
    id1, twos=one
    print("Id1:{}".format(id1))
    for two in twos:
        id2, values=two
        print("Id1:{} Id2:{}".format(id1,id2))
        for value1,value2 in values:
            print("Id1:{} Id2:{} Values:{} {}".format(id1,id2,value1,value2))

csvById1Rdd.foreach(printit)

相关问题 更多 >