我使用MRjob在HBase实例上运行Hadoop流作业。在我的一生中,我不知道如何将一个参数传递给我的减速机。我有两个参数要在运行作业时传递给reducer:startDate和endDate。我现在的减速机是这样的:
def reducer(self, groupId, meterList):
"""
Print bucket.
"""
sys.stderr.write("Working on group = " + str(groupId) + "\n")
#print "Opening connection..."
conn = open_connection(hostname)
#print "Getting table..."
table = get_table(conn, tableName)
compositeDf = DataFrame()
for meterId in meterList:
sys.stderr.write("Querying: " + str(meterId) + "\n")
df = extract_meter_data(table, meterId, startDate, endDate)
我似乎不能将startDate和endDate作为参数传递给我的reducer。唯一能让工作获取参数的方法是通过类顶部的全局变量。在
^{pr2}$但那是肮脏的。我想把它从电话里传过来。我试过很多方法。将其设置为实例变量,将其设置为静态类变量,为MRDataQualityJob创建重载构造函数。。。。似乎什么都不管用。我从我的顶级脚本以编程方式调用它,如下所示:
if args.hadoop:
mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
mrdq_job = MRDataQuality(args=[meterFile])
with mrdq_job.make_runner() as runner:
runner.run()
不管我对mrdq_作业实例做了什么,看起来跑步者。跑步()正在使用未定义实例或静态变量的类的新实例。如何将参数传递给减速机????我可以在普通的Hadoop流式处理中通过传递一个字符串:“--reducer”减速器.pyarg1 arg2”。有没有和MRjob相当的?在
把参数传递给job config,然后用get_jobconf_值读取它们怎么样?在
像这样:
然后像上面那样在代码中设置参数
^{pr2}$把参数传递给job config,然后用reducer_init中的get_jobconf_value读取它们怎么样?这样,您只需读取一次参数。在
像这样:
然后像上面那样在代码中设置参数
^{pr2}$相关问题 更多 >
编程相关推荐