pythonspark如何使用rddapi按组查找累计和

2024-10-01 09:31:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我是火花编程新手。需要spark python程序的帮助,在这里我有这样的输入数据,并希望获得每个组的累计摘要。感谢有人在这方面指导我。在

输入数据:

11,1,1100年

11,1,2150年

12,1,1,50

12,2,1,70

12,2,2,20

需要这样的输出数据:

11,1,1100年

11,1,2250/(100+150)

12,1,1,50

12,2,1,70

12,2,2,90/(70+20)

我尝试的代码:

def parseline(line):
    fields = line.split(",")
    f1 = float(fields[0])
    f2 = float(fields[1])
    f3 = float(fields[2])
    f4 = float(fields[3])
    return (f1, f2, f3, f4)

input = sc.textFile("FIle:///...../a.dat")
line = input.map(parseline)
linesorted = line.sortBy(lambda x: (x[0], x[1], x[2]))
runningpremium = linesorted.map(lambda y: (((y[0], y[1]),     y[3])).reduceByKey(lambda accum, num: accum + num)

for i in runningpremium.collect():
      print i

Tags: 数据lambdamapfieldsinputlinefloatf2
2条回答

使用Dataframe API

from pyspark.sql.types import StructType, StringType, LongType,StructField
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
sc= spark.sparkContext

rdd = sc.parallelize([(11, 100),(11, 150),(12, 50),(12, 70),(12, 20)])

schema = StructType([
    StructField("id", StringType()),
    StructField("amount", LongType())
    ])

df = spark.createDataFrame(rdd, schema)

df.registerTempTable("amount_table")
df.show();
df2 = spark.sql("SELECT id,amount, sum(amount) OVER (PARTITION BY id ORDER BY amount) as cumulative_sum FROM amount_table")
df2.show()

使用RDD API请尝试以下操作:

^{pr2}$

输出:

[(11, (1, 2, 100)), (11, (2, 1, 150)), (12, (1, 2, 50)), (12, (1, 3, 70)), (12, (3, 4, 20))]
[[11, 1, 2, 100], [11, 2, 1, 250], [12, 1, 2, 50], [12, 1, 3, 120], [12, 3, 4, 140]]

一个简单的例子只涉及两列(每个记录中有2个值)

rdd=sc.parallelize([(11, 100), (11, 150), (12, 50), (12, 70), (12, 20)])
from itertools import accumulate

def cumsum(values):
    return list(accumulate(values))
print(rdd.groupByKey().mapValues(cumsum).collect())
print(rdd.groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).collect())

输出:

[(11, [100, 250]), (12, [50, 120, 140])]
[(11, 100), (11, 250), (12, 50), (12, 120), (12, 140)]

在注释中,可以使用window函数对Spark Dataframe进行累计和。首先,我们可以创建一个带有伪列'a', 'b', 'c', 'd'的示例数据帧

ls = [(11,1,1,100), (11,1,2,150), (12,1,1,50), (12,2,1,70), (12,2,2,20)]
ls_rdd = spark.sparkContext.parallelize(ls)
df = spark.createDataFrame(ls_rdd, schema=['a', 'b', 'c', 'd'])

您可以按列ab进行分区,然后按列c排序。然后,将sum函数应用于末尾的列d

^{pr2}$

输出

+ -+ -+ -+   -+
|  a|  b|  c|cum_sum|
+ -+ -+ -+   -+
| 11|  1|  1|    100|
| 11|  1|  2|    250|
| 12|  1|  1|     50|
| 12|  2|  1|     70|
| 12|  2|  2|     90|
+ -+ -+ -+   -+

相关问题 更多 >