如何使用s3对象名作为MRJob映射器的输入,而不是s3对象本身?

2024-09-26 22:49:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我遗漏了一些关于Yelp的mrjob job库的明显信息。设置MRJob类几乎是非常简单的。在file或stdin上运行它也是如此。但是如何将作业的输入从本地或s3中的文件更改为s3存储桶中的键?在

像这样。假设我想计算S3存储桶中以字符串“foo”开头的所有对象:

import re

class MRCountS3Objects(MRJob):

    define mapper(self, _, botoS3Key):
        if re.match('^foo', botoS3Key.name):
            yield 'foo', 1

    define reduce(self, name, occurrences):
        yield name, sum(occurrences)

这是一个非常做作的例子,但你可能明白我的意思。如何让MRJob在s3对象流上操作,而忽略对象的内容?我看到了S3Filesystem.get_s3_密钥()method,这正是我需要的流,但我不知道从那里到哪里去。在


Tags: 对象nameselfres3foojobyelp
1条回答
网友
1楼 · 发布于 2024-09-26 22:49:36

至少有一种方法可以做到这一点。MRJob有一个可以分配给任何迭代器的stdin属性,然后可以编程方式运行该作业。例如,此代码应该处理my-bucket的键名:

from mrjob.job import MRJob
from mrjob.emr import EMRJobRunner

class MRS3KeyProcessor(MRJob):
    # Do some MRJob stuff.
    ...

def s3_name_generator(bucket):
    """Generator that returns boto.s3.Key names.
    """
    # Could also use raw boto here.
    emr = EMRJobRunner()
    key_stream = emr.fs.get_s3_keys(bucket)
    for key in key_stream:
        yield key.name

def main():
    # The '-' argument signifies that we use stdin.
    mr_job = MRCountS3Objects([' runner', 'inline', '-'])
    stdin = s3_name_generator('my-bucket')
    mr_job.stdin = stdin
    results = []
    with mr_job.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            key, value = mr_job.parse_output_line(line)
            results.append((key, value))
    print(results)

if __name__ == '__main__':
    main()

相关问题 更多 >

    热门问题