我一直在尝试查看给定的mapper_pre_filter
示例here。现在,如果不是直接在步骤中指定命令,而是编写一个方法来返回该命令,如下所示:
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
class KittiesJob(MRJob):
OUTPUT_PROTOCOL = JSONValueProtocol
def filter_input(self):
return ''' grep 'kitty' '''
def test_for_kitty(self, _, value):
yield None, 0 # make sure we have some output
if 'kitty' in value:
yield None, 1
def sum_missing_kitties(self, _, values):
yield None, sum(values)
def steps(self):
return [
self.mr(mapper_pre_filter=self.filter_input,
mapper=self.test_for_kitty,
reducer=self.sum_missing_kitties)]
if __name__ == '__main__':
KittiesJob().run()
我得到了以下例外:
^{pr2}$有人能解释一下我做错了什么吗?在
哇,这是迟来的答案。我想你应该改变这一点:
mapper_pre_filter=self.filter_input,
到mapper_pre_filter=self.filter_input(),
。{cd3>中的函数}不是一个函数。也许将来会对别人有所帮助。
堆栈跟踪表明过滤器的输出不是JSON可序列化的,因为它可能是空的。
相关问题 更多 >
编程相关推荐