<p><strong>使用Dataframe API</strong>:</p>
<pre><code>from pyspark.sql.types import StructType, StringType, LongType,StructField
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
sc= spark.sparkContext
rdd = sc.parallelize([(11, 100),(11, 150),(12, 50),(12, 70),(12, 20)])
schema = StructType([
StructField("id", StringType()),
StructField("amount", LongType())
])
df = spark.createDataFrame(rdd, schema)
df.registerTempTable("amount_table")
df.show();
df2 = spark.sql("SELECT id,amount, sum(amount) OVER (PARTITION BY id ORDER BY amount) as cumulative_sum FROM amount_table")
df2.show()
</code></pre>
<p><strong>使用RDD API</strong>请尝试以下操作:</p>
^{pr2}$
<p>输出:</p>
<pre><code>[(11, (1, 2, 100)), (11, (2, 1, 150)), (12, (1, 2, 50)), (12, (1, 3, 70)), (12, (3, 4, 20))]
[[11, 1, 2, 100], [11, 2, 1, 250], [12, 1, 2, 50], [12, 1, 3, 120], [12, 3, 4, 140]]
</code></pre>
<p><strong>一个简单的例子</strong>只涉及两列(每个记录中有2个值)</p>
<pre><code>rdd=sc.parallelize([(11, 100), (11, 150), (12, 50), (12, 70), (12, 20)])
from itertools import accumulate
def cumsum(values):
return list(accumulate(values))
print(rdd.groupByKey().mapValues(cumsum).collect())
print(rdd.groupByKey().mapValues(cumsum).flatMapValues(lambda x:x).collect())
</code></pre>
<p>输出:</p>
<pre><code>[(11, [100, 250]), (12, [50, 120, 140])]
[(11, 100), (11, 250), (12, 50), (12, 120), (12, 140)]
</code></pre>