带MRJob的多个输入

2024-06-26 13:46:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试学习使用Yelp的Python API来实现MapReduce,MRJob。他们简单的反词示例很有意义,但我很好奇如何处理涉及多个输入的应用程序。例如,不是简单地计算文档中的单词,而是将向量乘以矩阵。我想出了这个解决方案,它起作用,但感觉很傻:

class MatrixVectMultiplyTast(MRJob):
    def multiply(self,key,line):
            line = map(float,line.split(" "))
            v,col = line[-1],line[:-1]

            for i in xrange(len(col)):
                    yield i,col[i]*v

    def sum(self,i,occurrences):
            yield i,sum(occurrences)

    def steps(self):
            return [self.mr (self.multiply,self.sum),]

if __name__=="__main__":
    MatrixVectMultiplyTast.run()

这段代码运行./matrix.py < input.txt,之所以有效,是因为input.txt中的矩阵按列存储,相应的向量值在行的末尾。

所以,下面的矩阵和向量:

enter image description here

表示为input.txt:

enter image description here

简而言之,我该如何更自然地将矩阵和向量存储在单独的文件中,并将它们都传递到MRJob中?


Tags: selftxtinputdefline矩阵col向量
3条回答

您的问题的实际答案是,mrjob还不完全支持hadoop流式连接模式,即读取map_input_file环境变量(该变量公开map.input.file属性)以根据其路径和/或名称确定要处理的文件类型。

如果您可以很容易地从读取数据本身中检测出它属于哪种类型,那么您仍然可以将它拉下来,如本文所示:

http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

然而,这并不总是可能的。。。

否则我的工作看起来棒极了,我希望他们能在未来支持我。在那之前,这对我来说几乎是个交易破坏者。

这就是我如何使用多个输入,并根据文件名在映射程序阶段进行适当的更改。

跑步计划:

from mrjob.hadoop import *


#Define all arguments

os.environ['HADOOP_HOME'] = '/opt/cloudera/parcels/CDH/lib/hadoop/'
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))
job_running_time = datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')
hadoop_bin = '/usr/bin/hadoop'
mode = 'hadoop'
hs = HadoopFilesystem([hadoop_bin])

input_file_names = ["hdfs:///app/input_file1/","hdfs:///app/input_file2/"]

aargs = ['-r',mode,'--jobconf','mapred.job.name=JobName','--jobconf','mapred.reduce.tasks=3','--no-output','--hadoop-bin',hadoop_bin]
aargs.extend(input_file_names)
aargs.extend(['-o',output_dir])
print aargs
status_file = True

mr_job = MRJob(args=aargs)
with mr_job.make_runner() as runner:
    runner.run()
os.environ['HADOOP_HOME'] = ''
print "HADOOP HOME is now set to : %s" % (str(os.environ.get('HADOOP_HOME')))

MRJob类:

class MR_Job(MRJob):
    DEFAULT_OUTPUT_PROTOCOL = 'repr_value'
    def mapper(self, _, line):
    """
    This function reads lines from file.
    """
    try:
        #Need to clean email.
        input_file_name = get_jobconf_value('map.input.file').split('/')[-2]
                """
                Mapper code
                """
    except Exception, e:
        print e

    def reducer(self, email_id,visitor_id__date_time):
    try:
        """
                Reducer Code
                """
    except:
        pass


if __name__ == '__main__':
    MRV_Email.run()

如果需要根据另一个(或同一行,行)数据集处理原始数据,可以:

1)创建一个S3 bucket来存储数据的副本。将此副本的位置传递给任务类,例如下面代码中的self.options.bucket和self.options.my_datafile_copy_location。警告:不幸的是,在处理之前,整个文件似乎必须“下载”到任务机器上。如果连接出现故障或加载时间过长,则此作业可能会失败。下面是一些Python/MRJob代码。

把这个放到mapper函数中:

d1 = line1.split('\t', 1)
v1, col1 = d1[0], d1[1]
conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)
data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()
### CAVEAT: Needs to get the whole file before processing the rest.
for line2 in data_copy.split('\n'):
    d2 = line2.split('\t', 1)
    v2, col2 = d2[0], d2[1]
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
conn.close()

2)创建一个SimpleDB域,并将所有数据存储在其中。 在boto和SimpleDB上阅读: http://code.google.com/p/boto/wiki/SimpleDbIntro

映射程序代码如下所示:

dline = dline.strip()
d0 = dline.split('\t', 1)
v1, c1 = d0[0], d0[1]
sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)
domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)
for item in domain:
    v2, c2 = item.name, item['column']
    ## Now, insert code to do any operations between v1 and v2 (or c1 and c2) here:
    yield <your output key, value pairs>
sdb.close()

如果数据量非常大,则第二个选项的性能可能会更好,因为它可以对每行数据而不是一次发出全部数据的请求。请记住,SimpleDB值的长度最多只能为1024个字符,因此,如果数据值超过此长度,则可能需要通过某种方法进行压缩/解压缩。

相关问题 更多 >