Python mrjob mapreduce如何预处理输入fi

2024-09-19 23:44:17 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在放入mapreduce之前预处理一个XML文件以提取某些节点。我有以下代码:

from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
from mrjob.util import cmd_line, bash_wrap

class MRCountLinesByFile(MRJob):
    def configure_options(self):
        super(MRCountLinesByFile, self).configure_options()
        self.add_file_option('--filter')

    def mapper_cmd(self):
        cmd = cmd_line([self.options.filter, jobconf_from_env('mapreduce.map.input.file'])
        return cmd



if __name__ == '__main__':
    MRCountLinesByFile.run()

在命令行中,我键入:

^{pr2}$

test.txt是与here类似的普通XML文件。而filter.py是一个查找所有标题信息的脚本。在

但是,我得到了以下错误:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625
Running step 1 of 1...
Traceback (most recent call last):
  File "./filter.py", line 8, in <module>
    with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: 'None'
Step 1 of 1 failed: Command '['./filter.py', 'None']' returned non-zero exit status 1

在本例中它看起来像mapreduce.map.input.file呈现{}。如何让mapper_cmd函数读取mrjob当前正在读取的文件?在


Tags: 文件frompyimportselfcmdlinexml
1条回答
网友
1楼 · 发布于 2024-09-19 23:44:17

据我所知self.add_file_选项应该有文件的路径。在

self.add_file_option(' items', help='Path to u.item')

我不太明白你的想法,但我的理解是。 使用configure选项可确保将给定文件发送给所有映射器进行处理,例如,当您要对源文件以外的其他文件中的数据执行辅助查找时。此辅助查找文件由提供self.add_file_选项('items',help='Path to u.item')。在

要在reducer或mapper阶段之前进行预处理,可以使用reducer_init或mapper_init。这些初始化或处理步骤也需要在您的step函数中提到,例如如下所示。在

^{pr2}$

在init函数中,在发送到mapper或reducer之前,对需要完成的内容进行实际的预处理。例如,打开一个xyz文件,将第一个字段中的值复制到另一个字段中,我将在我的reducer中使用该字段并输出相同的值。在

def reducer_init(self):
        self.movieNames = {}    
        with open("xyz") as f:
            for line in f:
                fields = line.split('|')
                self.myNames[fields[0]] = fields[1]

希望这有帮助!!在

相关问题 更多 >