将参数传递给MRjob中的reducer

2024-10-01 11:37:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用MRjob在HBase实例上运行Hadoop流作业。在我的一生中,我不知道如何将一个参数传递给我的减速机。我有两个参数要在运行作业时传递给reducer:startDate和endDate。我现在的减速机是这样的:

def reducer(self, groupId, meterList):
    """
    Print bucket.
    """
    sys.stderr.write("Working on group = " + str(groupId) + "\n")
    #print "Opening connection..."
    conn = open_connection(hostname)
    #print "Getting table..."
    table = get_table(conn, tableName)

    compositeDf = DataFrame()

    for meterId in meterList:
        sys.stderr.write("Querying: " + str(meterId) + "\n")
        df = extract_meter_data(table, meterId, startDate, endDate)

我似乎不能将startDate和endDate作为参数传递给我的reducer。唯一能让工作获取参数的方法是通过类顶部的全局变量。在

^{pr2}$

但那是肮脏的。我想把它从电话里传过来。我试过很多方法。将其设置为实例变量,将其设置为静态类变量,为MRDataQualityJob创建重载构造函数。。。。似乎什么都不管用。我从我的顶级脚本以编程方式调用它,如下所示:

if args.hadoop:
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
    mrdq_job = MRDataQuality(args=[meterFile])

with mrdq_job.make_runner() as runner:
    runner.run()

不管我对mrdq_作业实例做了什么,看起来跑步者。跑步()正在使用未定义实例或静态变量的类的新实例。如何将参数传递给减速机????我可以在普通的Hadoop流式处理中通过传递一个字符串:“--reducer”减速器.pyarg1 arg2”。有没有和MRjob相当的?在


Tags: 实例hadoop参数作业tableargsjobrunner
2条回答

把参数传递给job config,然后用get_jobconf_值读取它们怎么样?在

像这样:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer(self, groupId, meterList):
    ...
    startDate = get_jobconf_value("my.job.settings.startdate")
    endDate = get_jobconf_value("my.job.settings.enddate")

    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, startDate, endDate)    

然后像上面那样在代码中设置参数

^{pr2}$

把参数传递给job config,然后用reducer_init中的get_jobconf_value读取它们怎么样?这样,您只需读取一次参数。在

像这样:

from mrjob.compat import get_jobconf_value

class MRDataQuality(MRJob):

  def reducer_init(self):
    ...
    self.startDate = get_jobconf_value("my.job.settings.startdate")
    self.endDate = get_jobconf_value("my.job.settings.enddate")

  def reducer(self, groupId, meterList):
    for meterId in meterList:
      sys.stderr.write("Querying: " + str(meterId) + "\n")
      df = extract_meter_data(table, meterId, self.startDate, self.endDate)    

然后像上面那样在代码中设置参数

^{pr2}$

相关问题 更多 >