pysp的还原和计数结果不同

2024-09-28 19:09:11 发布

您现在位置:Python中文网/ 问答频道 /正文

对于spark测试,我下载了NY taxi csv files并将它们合并到一个文件中,纽约出租车.csv. 然后我把这个保存在hadoopfs中。我正在使用spark on纱线和7个节点管理器。在

我正在连接spark over Ipython笔记本。在

下面是一个示例python脚本,用于计算纽约出租车.csv. 在

nytaxi=sc.textFile("hdfs://bigdata6:8020/user/baris/nytaxi/nytaxi.csv")
filtered=nytaxi.filter(lambda x:"distance" not in x)
splits = filtered.map(lambda x: float(x.split(",")[9]))
splits.cache()
splits.count()

返回73491693。 但是,当我试图用下面的代码计算行数时,它返回一个大约803000的值。在

^{pr2}$

我想知道为什么结果不同。 谢谢

csv中的采样线: u'740BD5BE61840BE4FE3905CC3EBE3E7E,E48B185060FB0FF49BE6DA43E69E624B,CMT,1,N,2013-10-01 12:44:292013-10-01 12:53:26,1536,1.20,-73.974319,40.741859,-73.99115,40.742424'


Tags: 文件csvlambda节点onfilesfilteredspark
3条回答

^{}的文档说明:

Reduces the elements of this RDD using the specified commutative and associative binary operator.

def plusOne(sum, v): return sum + 1不是可交换的。它完全忽略其中一个参数。所以你看到的是未定义的行为。(我建议考虑为什么函数必须是可交换的。如果你明白了,你对火花的理解就更好了!)在

解决方案是使用RDD.count()。但是,如果您坚持使用reduce(),您可以这样做:

def count(rdd):
  return rdd.map(lambda x: 1).reduce(lambda a, b: a + b)

这不是完整答案

因为我不能把我的发现发表评论,所以我写在这里。在

我可以用一个简单得多的例子来重现你的问题。在

data = xrange(1, 10000)
len(data) #output => 9999
xrangeRDD = sc.parallelize(data, 8)
print xrangeRDD.count()
def plusOne (v,sum):
  #print sum, v
  return v + 1;
a = xrangeRDD.reduce(plusOne)
print a

输出

^{pr2}$

对于xrangeRDD = sc.parallelize(data, 4)

输出

9999
2502

对于xrangeRDD = sc.parallelize(data, 1)

输出

9999
9999

因为我只是改变了分区的数量,这也改变了reduce的输出,所以我认为reduce只给出了一个分区的输出,正如这里的模式所建议的那样。在

我还在学习spark是如何工作的。所以我不知道为什么会这样。我希望有人能用这些额外的细节来解释这背后的原因。在

问题是Daniel在中指定的,reduce中使用的操作必须是关联的和可交换的。Here's the reason from the source itself

val reducePartition: Iterator[T] => Option[T] = iter => {
  if (iter.hasNext) {
    Some(iter.reduceLeft(cleanF))
  } else {
    None
  }
}

注意,对每个分区所做的reduce是对其迭代器的reduceLeft的简单委派。这不会引起任何问题,因为这只是价值的积累。在

^{pr2}$

但是,分区的合并是个问题。在您的示例中,它是如何分解的(假设在4个平均分割的分区上有40个计数):

A = 10; B = 10; C = 10; D = 10 //Local reductions. Everything ok
A added in = 10 //Still ok
B added in = f(10, 10) = 11 //Because your definition of f is (first + 1)
                            //This drops the second param of 10
C added in = f(11, 10) = 12 //Again, only adding 1 instead of the actual 10 count

所以,您应该选择count,或者按照Daniel的建议和map来做,或者你有第三个选择来做{}

 rdd.aggregate(0)(_+1, _+_)

这将使用0作为计数的种子,继续在本地向累加器添加1,然后在合并中将两个累加器相加。在

相关问题 更多 >