我使用下面的代码片段将CSV文件作为Dicts读入管道。在
class MyCsvFileSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.DictReader(self._file, dialect=MyCustomDialect)
for rec in reader:
yield rec
这个片段几乎是从How to convert csv into a dictionary in apache beam dataflow (Pablo的答案)的帖子中抄袭过来的。在
后来我注意到这一切都与相对较小的文件(例如35k行)很好。生成的bigsaw.700k行数较大,但输出的行数较大。几乎是5倍,所以我得到了超过3米的行。在
我仔细观察了beam.io.filebasedsource.FileBasedSource
,看到了参数{True
。在
文件上说:
^{pr2}$当参数设置为True
时,它可以并行读取源文件。在
我注意到,如果我将这个参数设置为False
,文件读起来很好,并且没有重复项。在
目前,我将这个splittable
参数设置为False
,因为它可以排除重复项,但我不确定当我的文件将成行增长时,这是否是未来的证据。在
并行读取源文件是否可能存在问题?有没有什么我忽略了或者没有正确处理的地方?在
要支持不重复的拆分,在从源代码读取时必须使用传递的“range_tracker”对象。例如,当声明正在读取的文件的唯一位置时,必须调用try\u claim()。在
更多信息请参见以下内容。 https://beam.apache.org/documentation/sdks/python-custom-io/
相关问题 更多 >
编程相关推荐