有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

ruby EMR流作业,使用mapper和reducer的Java代码

我目前使用ruby编写的mapper和reducer代码运行流作业。我想把它们转换成java。我不知道如何使用java使用EMR hadoop运行流作业。亚马逊EMR网站cloudburst中给出的示例太复杂了。以下是我当前如何运行作业的详细信息

启动作业的代码:

        elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small 
--slave-instance-type m1.xlarge --num-instances 2  --name "Job Name" --bootstrap-action 
    s3://bucket-path/bootstrap.sh

添加步骤的代码:

    elastic-mapreduce -j <job_id> --stream --step-name "my_step_name" 
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb 
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt 
--input s3://bucket-path/input --output s3://bucket-path/output

映射程序代码从csv文件中读取,该文件在上文中被称为EMR的缓存参数,它还从同样具有一些csv文件的输入s3存储桶中读取,执行一些计算,并将csv输出行打印到标准输出

//mapper.rb 
CSV_OPTIONS  = {
  // some CSV options
}

begin
    file = File.open("cache.txt")
    while (line = file.gets)
        // do something
    end
    file.close
end

input  = FasterCSV.new(STDIN, CSV_OPTIONS)
input.each{ 
// do calculations and get result
puts (result)
}

//reducer.rb

$stdin.each_line do |line|
// do some aggregations and get aggregation_result
if(some_condition) puts(aggregation_result)
end

共 (2) 个答案

  1. # 1 楼答案

    现在我在Hadoop和Mapreduce上有了更好的据点,以下是我的预期:

    要启动集群,代码将与问题中的代码大致相同,但我们可以添加配置参数:

    ruby elastic-mapreduce  create  alive  plain-output  master-instance-type m1.xlarge  slave-instance-type m1.xlarge  num-instances 11   name "Java Pipeline"  bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia  bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop  args " mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"
    

    要添加作业步骤,请执行以下操作:

    第一步:

    ruby elastic-mapreduce jobflow <jobflo_id> jar s3://somepath/job-one.jar arg s3://somepath/input-one arg s3://somepath/output-one args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

    第二步:

    ruby elastic-mapreduce jobflow <jobflo_id> jar s3://somepath/job-two.jar arg s3://somepath/output-one arg s3://somepath/output-two args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0

    至于Java代码,将有一个主类,其中包含以下每个类的一个实现:

    • 组织。阿帕奇。hadoop。mapreduce。制图员
    • 组织。阿帕奇。hadoop。mapreduce。减速器

    每种方法都必须重写map()和reduce()方法才能完成所需的工作

    问题的Java类如下所示:

    public class SomeJob extends Configured implements Tool {
    
        private static final String JOB_NAME = "My Job";
    
        /**
         * This is Mapper.
         */
        public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {
    
            private Text outputKey = new Text();
            private Text outputValue = new Text();
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
    
                // Get the cached file
                Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
    
                File fileObject = new File (file.toString());
                // Do whatever required with file data
            }
    
            @Override
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                outputKey.set("Some key calculated or derived");
                outputVey.set("Some Value calculated or derived");
                context.write(outputKey, outputValue);
            }
            }
    
        /**
         * This is Reducer.
         */
        public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
    
        private Text outputKey = new Text();
        private Text outputValue = new Text();
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                    InterruptedException {
                outputKey.set("Some key calculated or derived");
                outputVey.set("Some Value calculated or derived");
                context.write(outputKey, outputValue);
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
    
            try {
                Configuration conf = getConf();
                DistributedCache.addCacheFile(new URI(args[2]), conf);
                Job job = new Job(conf);
    
                job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
                job.setJobName(JOB_NAME);
    
                job.setMapperClass(MapJob.class);
                job.setReducerClass(ReduceJob.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
    
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                FileInputFormat.setInputPaths(job, args[0]);
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
                boolean success = job.waitForCompletion(true);
                return success ? 0 : 1;
            } catch (Exception e) {
                e.printStackTrace();
                return 1;
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            if (args.length < 3) {
                System.out
                        .println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>");
                System.exit(-1);
            }
    
            int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
            System.exit(result);
        }
    
    }
    
  2. # 2 楼答案

    如果使用的是java,就不会使用流媒体。直接根据MapReduce API构建Jar

    查看hadoop源代码的examples文件夹,了解一些关于如何实现这一点的好例子,包括臭名昭著的wordcount: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples

    我不完全确定为什么要使用Java,但直接编写API代码可能会很痛苦。您可能想尝试以下方法之一: Java项目:

    非Java:

    FWIW我想猪可能会是我的选择,并且在EMR上得到了开箱即用的支持