如何在数据流中启用文件的并行读取?

2024-10-02 14:16:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个数据流管道,它从GCS读取1000个文件(每个50 MB),并在所有文件的行上执行一些计算。每个文件都是具有相同结构的CSV,只是其中的数字不同,我计算所有文件中每个单元格的平均值

管道如下所示(python):

additional_side_inputs = {'key1': 'value1', 'key2': 'value2'}  # etc.

p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
  | 'Read files' >> ReadMatches()
  | 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
  | 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())

FileToRowsFn类如下所示(参见下文,省略了一些细节)。row_id是第一列,是每行的唯一键;它在每个文件中只存在一次,因此我可以计算所有文件的平均值。还有一些附加值作为变压器的侧输入提供,下面的方法体中没有显示这些值,但实际实现仍然使用这些值。此值是在管道外部创建的字典。我在这里提到它,以防这可能是缺乏并行化的一个原因

class FileToRowsFn(beam.DoFn):
  def process(self, file_element, additional_side_inputs):
    with file_element.open() as csv_file:
      for row_id, *values in csv.reader(TextIOWrapper(csv_file, encoding='utf-8')):
        yield row_id, values

AverageCalculatorFn是一个典型的带有累加器的beam.CombineFn,它对所有文件中具有相同行id的所有行执行给定行的每个单元格的平均值

所有这些都可以正常工作,但性能和吞吐量存在问题:执行此管道需要60多个小时。在监控控制台中,我注意到文件是按顺序读取的(每2分钟读取一个文件)。我理解读取一个文件可能需要2分钟(每个文件50 MB),但我不理解为什么dataflow不分配更多的工作人员来并行读取多个文件。cpu保持在~2-3%,因为大部分时间都花在文件IO上,并且工作进程数不超过2(尽管没有设置限制)

ReadMatches的输出是1000个文件记录,那么为什么数据流不创建大量的FileToRowsFn实例并将它们分派给新的工作进程,每个工作进程处理一个文件

有没有办法强制执行这种行为


Tags: 文件csvid管道进程mbside数据流
1条回答
网友
1楼 · 发布于 2024-10-02 14:16:32

这可能是因为Dataflow runner将您的所有步骤融合为一个步骤

对于这样一个要并行化的融合包,第一步需要是可并行化的。在您的例子中,这是一个不可并行化的glob扩展

要使管道可并行化,可以尝试中断fusion。这可以通过添加Reshuffle转换作为产生许多元素的步骤之一的使用者来实现

比如说,

from apache_beam import Reshuffle

additional_side_inputs = {'key1': 'value1', 'key2': 'value2'}  # etc.

p | 'Collect CSV files' >> MatchFiles(input_dir + "*.csv")
  | 'Read files' >> ReadMatches()
  | 'Reshuffle' >> Reshuffle()
  | 'Parse contents' >> beam.ParDo(FileToRowsFn(), additional_side_inputs)
  | 'Compute average' >> beam.CombinePerKey(AverageCalculatorFn())

如果使用Beam中可用的标准源之一(如textio.ReadFromText())读取数据,则不必执行此操作。(遗憾的是,我们没有CSV源代码,但ReadFromText支持跳过标题行)

有关融合优化和防止融合的更多信息,请参见here

相关问题 更多 >

    热门问题