ruby EMR流作业,使用mapper和reducer的Java代码
我目前使用ruby编写的mapper和reducer代码运行流作业。我想把它们转换成java。我不知道如何使用java使用EMR hadoop运行流作业。亚马逊EMR网站cloudburst中给出的示例太复杂了。以下是我当前如何运行作业的详细信息
启动作业的代码:
elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small
--slave-instance-type m1.xlarge --num-instances 2 --name "Job Name" --bootstrap-action
s3://bucket-path/bootstrap.sh
添加步骤的代码:
elastic-mapreduce -j <job_id> --stream --step-name "my_step_name"
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt
--input s3://bucket-path/input --output s3://bucket-path/output
映射程序代码从csv文件中读取,该文件在上文中被称为EMR的缓存参数,它还从同样具有一些csv文件的输入s3存储桶中读取,执行一些计算,并将csv输出行打印到标准输出
//mapper.rb
CSV_OPTIONS = {
// some CSV options
}
begin
file = File.open("cache.txt")
while (line = file.gets)
// do something
end
file.close
end
input = FasterCSV.new(STDIN, CSV_OPTIONS)
input.each{
// do calculations and get result
puts (result)
}
//reducer.rb
$stdin.each_line do |line|
// do some aggregations and get aggregation_result
if(some_condition) puts(aggregation_result)
end
# 1 楼答案
现在我在Hadoop和Mapreduce上有了更好的据点,以下是我的预期:
要启动集群,代码将与问题中的代码大致相同,但我们可以添加配置参数:
要添加作业步骤,请执行以下操作:
第一步:
ruby elastic-mapreduce jobflow <jobflo_id> jar s3://somepath/job-one.jar arg s3://somepath/input-one arg s3://somepath/output-one args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
第二步:
ruby elastic-mapreduce jobflow <jobflo_id> jar s3://somepath/job-two.jar arg s3://somepath/output-one arg s3://somepath/output-two args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
至于Java代码,将有一个主类,其中包含以下每个类的一个实现:
每种方法都必须重写map()和reduce()方法才能完成所需的工作
问题的Java类如下所示:
# 2 楼答案
如果使用的是java,就不会使用流媒体。直接根据MapReduce API构建Jar
查看hadoop源代码的examples文件夹,了解一些关于如何实现这一点的好例子,包括臭名昭著的wordcount: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples
我不完全确定为什么要使用Java,但直接编写API代码可能会很痛苦。您可能想尝试以下方法之一: Java项目:
非Java:
FWIW我想猪可能会是我的选择,并且在EMR上得到了开箱即用的支持