我正在尝试学习使用Yelp的Python API来实现MapReduce,MRJob。他们简单的反词示例很有意义,但我很好奇如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它起作用,但感觉很傻:
class MatrixVectMultiplyTast(MRJob):
def multiply(self,key,line):
line = map(float,line.split(" "))
v,col = line[-1],line[:-1]
for i in xrange(len(col)):
yield i,col[i]*v
def sum(self,i,occurrences):
yield i,sum(occurrences)
def steps(self):
return [self.mr (self.multiply,self.sum),]
if __name__=="__main__":
MatrixVectMultiplyTast.run()
这段代码运行./matrix.py < input.txt
,之所以有效,是因为input.txt中的矩阵按列存储,相应的向量值在行的末尾。
所以,下面的矩阵和向量:
表示为input.txt:
简而言之,我该如何更自然地将矩阵和向量存储在单独的文件中,并将它们都传递到MRJob中?
您的问题的实际答案是,mrjob还不完全支持hadoop流式连接模式,即读取map_input_file环境变量(该变量公开map.input.file属性)以根据其路径和/或名称确定要处理的文件类型。
如果您可以很容易地从读取数据本身中检测出它属于哪种类型,那么您仍然可以将它拉下来,如本文所示:
http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/
然而,这并不总是可能的。。。
否则我的工作看起来棒极了,我希望他们能在未来支持我。在那之前,这对我来说几乎是个交易破坏者。
这就是我如何使用多个输入,并根据文件名在映射程序阶段进行适当的更改。
跑步计划:
MRJob类:
如果需要根据另一个(或同一行,行)数据集处理原始数据,可以:
1)创建一个S3 bucket来存储数据的副本。将此副本的位置传递给任务类,例如下面代码中的self.options.bucket和self.options.my_datafile_copy_location。警告:不幸的是,在处理之前,整个文件似乎必须“下载”到任务机器上。如果连接出现故障或加载时间过长,则此作业可能会失败。下面是一些Python/MRJob代码。
把这个放到mapper函数中:
2)创建一个SimpleDB域,并将所有数据存储在其中。 在boto和SimpleDB上阅读: http://code.google.com/p/boto/wiki/SimpleDbIntro
映射程序代码如下所示:
如果数据量非常大,则第二个选项的性能可能会更好,因为它可以对每行数据而不是一次发出全部数据的请求。请记住,SimpleDB值的长度最多只能为1024个字符,因此,如果数据值超过此长度,则可能需要通过某种方法进行压缩/解压缩。
相关问题 更多 >
编程相关推荐