对于spark测试,我下载了NY taxi csv files并将它们合并到一个文件中,纽约出租车.csv. 然后我把这个保存在hadoopfs中。我正在使用spark on纱线和7个节点管理器。在
我正在连接spark over Ipython笔记本。在
下面是一个示例python脚本,用于计算纽约出租车.csv. 在
nytaxi=sc.textFile("hdfs://bigdata6:8020/user/baris/nytaxi/nytaxi.csv")
filtered=nytaxi.filter(lambda x:"distance" not in x)
splits = filtered.map(lambda x: float(x.split(",")[9]))
splits.cache()
splits.count()
返回73491693。 但是,当我试图用下面的代码计算行数时,它返回一个大约803000的值。在
^{pr2}$我想知道为什么结果不同。 谢谢
csv中的采样线: u'740BD5BE61840BE4FE3905CC3EBE3E7E,E48B185060FB0FF49BE6DA43E69E624B,CMT,1,N,2013-10-01 12:44:292013-10-01 12:53:26,1536,1.20,-73.974319,40.741859,-73.99115,40.742424'
^{} 的文档说明:
def plusOne(sum, v): return sum + 1
不是可交换的。它完全忽略其中一个参数。所以你看到的是未定义的行为。(我建议考虑为什么函数必须是可交换的。如果你明白了,你对火花的理解就更好了!)在解决方案是使用
RDD.count()
。但是,如果您坚持使用reduce()
,您可以这样做:这不是完整答案
因为我不能把我的发现发表评论,所以我写在这里。在
我可以用一个简单得多的例子来重现你的问题。在
输出
^{pr2}$对于
xrangeRDD = sc.parallelize(data, 4)
输出
对于
xrangeRDD = sc.parallelize(data, 1)
输出
因为我只是改变了分区的数量,这也改变了reduce的输出,所以我认为reduce只给出了一个分区的输出,正如这里的模式所建议的那样。在
我还在学习spark是如何工作的。所以我不知道为什么会这样。我希望有人能用这些额外的细节来解释这背后的原因。在
问题是Daniel在中指定的,
reduce
中使用的操作必须是关联的和可交换的。Here's the reason from the source itself:注意,对每个分区所做的
^{pr2}$reduce
是对其迭代器的reduceLeft
的简单委派。这不会引起任何问题,因为这只是价值的积累。在但是,分区的合并是个问题。在您的示例中,它是如何分解的(假设在4个平均分割的分区上有40个计数):
所以,您应该选择}
count
,或者按照Daniel的建议和map
来做,或者你有第三个选择来做{这将使用0作为计数的种子,继续在本地向累加器添加1,然后在合并中将两个累加器相加。在
相关问题 更多 >
编程相关推荐