如何为mrJobs中的多行输入编写自定义协议

2024-10-03 09:07:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将mrJobs与csv文件一起使用。问题是csv文件的输入跨越了多行

通过搜索mrJob文档,我认为我需要编写一个自定义协议来处理输入

我试图在下面编写我自己的协议,multiLineCsvInputProtocol,但我已经得到一个错误:TypeError: a bytes-like object is required, not 'str'

我不想撒谎,我想我在这里是疯了

基本上,多行csv文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,将每一行都放在逗号上,将值存储在一个列表中,每当新行以日期字符串开头时,我想yield将整个列表映射到第一个映射器

(或找到其他更好的方法来读取多行csv输入)

有人能帮我通过这个错误吗

import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol

class multiLineCsvInputProtocol(object):
    def read(self, line):
        key, val = enumerate(line.split(',', 1))
        return key, val


class someTask(MRJob):

  INPUT_PROTOCOL = multiLineCsvInputProtocol

  def mapper1(self,_, row):
    yield (row, 1 )


if __name__ == '__main__':
    MRFindReciprocal.run()
    

Tags: 文件csv字符串fromimport协议列表object
1条回答
网友
1楼 · 发布于 2024-10-03 09:07:55

根据mrjobdocumentationread函数的参数的类型为bytestring,您很可能会得到该错误,因为您被','分割,这是一个str

Writing custom protocols

A protocol is an object with methods read(self, line) and write(self, key, value). The read() method takes a bytestring and returns a 2-tuple of decoded objects, and write() takes the key and value and returns bytes to be passed back to Hadoop Streaming or as output.

可能的解决办法:

  1. 您可以尝试通过b','进行拆分,这是一个bytestring
  2. 您可以在分割之前解码,如下:line.decode().split(',', 1)(指定编码可能是个好主意)

相关问题 更多 >