擅长:python、mysql、java
<p>问题是Daniel在中指定的,<code>reduce</code>中使用的操作必须是关联的和可交换的。<a href="https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L961" rel="nofollow">Here's the reason from the source itself</a>:</p>
<pre><code>val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
</code></pre>
<p>注意,对每个分区所做的<code>reduce</code>是对其迭代器的<code>reduceLeft</code>的简单委派。这不会引起任何问题,因为这只是价值的积累。在</p>
^{pr2}$
<p>但是,分区的合并是个问题。在您的示例中,它是如何分解的(假设在4个平均分割的分区上有40个计数):</p>
<pre><code>A = 10; B = 10; C = 10; D = 10 //Local reductions. Everything ok
A added in = 10 //Still ok
B added in = f(10, 10) = 11 //Because your definition of f is (first + 1)
//This drops the second param of 10
C added in = f(11, 10) = 12 //Again, only adding 1 instead of the actual 10 count
</code></pre>
<p>所以,您应该选择<code>count</code>,或者按照Daniel的建议和<code>map</code>来做,或者你有第三个选择来做{<cd6>}</p>
<pre><code> rdd.aggregate(0)(_+1, _+_)
</code></pre>
<p>这将使用0作为计数的种子,继续在本地向累加器添加1,然后在合并中将两个累加器相加。在</p>