我正在尝试将mrJobs与csv文件一起使用。问题是csv文件的输入跨越了多行
通过搜索mrJob文档,我认为我需要编写一个自定义协议来处理输入
我试图在下面编写我自己的协议,multiLineCsvInputProtocol
,但我已经得到一个错误:TypeError: a bytes-like object is required, not 'str'
我不想撒谎,我想我在这里是疯了
基本上,多行csv文件中的每一行新数据都以日期字符串开头。我想逐行读取输入,将每一行都放在逗号上,将值存储在一个列表中,每当新行以日期字符串开头时,我想yield
将整个列表映射到第一个映射器
(或找到其他更好的方法来读取多行csv输入)
有人能帮我通过这个错误吗
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import protocol
class multiLineCsvInputProtocol(object):
def read(self, line):
key, val = enumerate(line.split(',', 1))
return key, val
class someTask(MRJob):
INPUT_PROTOCOL = multiLineCsvInputProtocol
def mapper1(self,_, row):
yield (row, 1 )
if __name__ == '__main__':
MRFindReciprocal.run()
根据
mrjob
的documentation,read函数的行参数的类型为bytestring,您很可能会得到该错误,因为您被','
分割,这是一个str:可能的解决办法:
b','
进行拆分,这是一个bytestringline.decode().split(',', 1)
(指定编码可能是个好主意)相关问题 更多 >
编程相关推荐