我正在开发一个数据流管道,它从GCS读取1000个文件(每个50 MB),并在所有文件的行上执行一些计算。每个文件都是具有相同结构的CSV,只是其中的数字不同,我计算所有文件中每个单元格的平均值
管道如下所示(python):
additional_side_inputs = {'key1': 'value1', 'key2': 'value2'} # etc.
p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
| 'Read files' >> ReadMatches()
| 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
| 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())
FileToRowsFn
类如下所示(参见下文,省略了一些细节)。row_id
是第一列,是每行的唯一键;它在每个文件中只存在一次,因此我可以计算所有文件的平均值。还有一些附加值作为变压器的侧输入提供,下面的方法体中没有显示这些值,但实际实现仍然使用这些值。此值是在管道外部创建的字典。我在这里提到它,以防这可能是缺乏并行化的一个原因
class FileToRowsFn(beam.DoFn):
def process(self, file_element, additional_side_inputs):
with file_element.open() as csv_file:
for row_id, *values in csv.reader(TextIOWrapper(csv_file, encoding='utf-8')):
yield row_id, values
AverageCalculatorFn
是一个典型的带有累加器的beam.CombineFn
,它对所有文件中具有相同行id的所有行执行给定行的每个单元格的平均值
所有这些都可以正常工作,但性能和吞吐量存在问题:执行此管道需要60多个小时。在监控控制台中,我注意到文件是按顺序读取的(每2分钟读取一个文件)。我理解读取一个文件可能需要2分钟(每个文件50 MB),但我不理解为什么dataflow不分配更多的工作人员来并行读取多个文件。cpu保持在~2-3%,因为大部分时间都花在文件IO上,并且工作进程数不超过2(尽管没有设置限制)
ReadMatches
的输出是1000个文件记录,那么为什么数据流不创建大量的FileToRowsFn
实例并将它们分派给新的工作进程,每个工作进程处理一个文件
有没有办法强制执行这种行为
这可能是因为Dataflow runner将您的所有步骤融合为一个步骤
对于这样一个要并行化的融合包,第一步需要是可并行化的。在您的例子中,这是一个不可并行化的glob扩展
要使管道可并行化,可以尝试中断fusion。这可以通过添加Reshuffle转换作为产生许多元素的步骤之一的使用者来实现
比如说,
如果使用Beam中可用的标准源之一(如
textio.ReadFromText()
)读取数据,则不必执行此操作。(遗憾的是,我们没有CSV源代码,但ReadFromText
支持跳过标题行)有关融合优化和防止融合的更多信息,请参见here
相关问题 更多 >
编程相关推荐