MRJob中的mapper_pre_过滤器

2024-10-06 06:49:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在尝试查看给定的mapper_pre_filter示例here。现在,如果不是直接在步骤中指定命令,而是编写一个方法来返回该命令,如下所示:

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol


class KittiesJob(MRJob):
    OUTPUT_PROTOCOL = JSONValueProtocol

    def filter_input(self):
        return ''' grep 'kitty' '''

    def test_for_kitty(self, _, value):
        yield None, 0  # make sure we have some output
        if 'kitty' in value:
            yield None, 1

    def sum_missing_kitties(self, _, values):
        yield None, sum(values)

    def steps(self):
        return [
            self.mr(mapper_pre_filter=self.filter_input,
                    mapper=self.test_for_kitty,
                    reducer=self.sum_missing_kitties)]

if __name__ == '__main__':
    KittiesJob().run()

我得到了以下例外:

^{pr2}$

有人能解释一下我做错了什么吗?在


Tags: fromimport命令selfnonedeffilterpre
1条回答
网友
1楼 · 发布于 2024-10-06 06:49:48

哇,这是迟来的答案。我想你应该改变这一点: mapper_pre_filter=self.filter_input,mapper_pre_filter=self.filter_input(),

{cd3>中的函数}不是一个函数。也许将来会对别人有所帮助。

堆栈跟踪表明过滤器的输出不是JSON可序列化的,因为它可能是空的。

相关问题 更多 >