数据流管道吞吐量随着执行的推进和意外的端输入行为而急剧下降

2024-10-01 22:40:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据流管道处理大约1Gb的数据输入,其中两个DICT作为side_输入。目标是在这两个输入的帮助下,从主数据集计算特征

管道总体结构如下:

    # First side input, ends up as a 2GB dict with 3.5 million keys
    side_inp1 = ( p | 
        "read side_input1" >> beam.io.ReadFromAvro("$PATH/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )

    # Second side input, ends up as a 1.6GB dict with 4.5 million keys
    side_inp2 = (p | 
        "read side_input2" >> beam.io.ReadFromAvro("$PATH2/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )
    
    # The main part of the pipeline, reading an avro dataset of 1 million rows -- 20GB
    (p |
     "read inputs" >> beam.io.ReadFromAvro("$MainPath/*.avro") | 
     "main func" >> beam.Map(MyMapper, pvalue.AsDict(side_inp1), pvalue.AsDict(side_inp2))
     )

以下是数据流图:

Dataflow graph

以及“Featureize”步骤展开: enter image description here

Featureize是一个在输入端查找ID的函数,它查找向量,并使用180种不同的向量点积方法来计算某些特征。这是一个完全受CPU限制的进程,预计需要比管道中的其他部分更长的时间,但这里的奇怪之处在于暂停

我的问题有两个:

  1. 随着数据流在流程中的进一步移动,它的速度似乎急剧减慢。我不知道原因是什么,我怎样才能缓解这个问题。下面可以看到MyMapper步骤的吞吐量图,我想知道吞吐量是否在下降(从~400行/秒下降到最后的~1行/秒)

A throughput chart of MyMapper state

  1. 另外,side_输入的行为对我来说很奇怪。我希望side_输入是只读的,并且只有一次,但是当我签出作业度量/吞吐量图表时,我看到了下面的图表。可以看出,管道不断地读取side_输入,而我想要的只是保存在内存中的两个dict

Overal Job Throughput

其他作业配置

  • 地区:美国中部-1a
  • 机器类型:m1-ultramem-40(40个CPU核,960GB RAM)
  • 磁盘类型/大小:ssd/50GB
  • 实验:已启用洗牌服务
  • 最大工作人数:1有助于简化计算和度量,并且不会因自动缩放而使它们发生变化

额外观察

  • 我经常在LogViewer中看到如下日志实体:[INFO] Completed workitem: 4867151000103436312 in 1069.056863785 seconds" 到目前为止,所有完成的工作项都花费了大约1000-1100秒,这是另一个令人困惑的原因,为什么在处理工作项花费与以前相同的时间时吞吐量会下降?并行性是否因某种原因而下降?(可能是一些我无法控制的隐藏线程阈值,比如线束线程?)

  • 在管道的后面部分,查看日志,执行模式看起来非常有序(似乎它正在执行1个工作项,完成它,转到下一个工作项,考虑到有1TB的可用内存和40个内核,这对我来说很奇怪)

  • 有0个错误甚至警告


Tags: of数据iomapread管道吞吐量side
1条回答
网友
1楼 · 发布于 2024-10-01 22:40:52

第1点中的吞吐量图是一个很好的指标,表明您的工作绩效有所下降

侧输入拟为in memory;但是,我不太确定只有1个highmem节点的管道是否是一种好方法。由于只有一个节点,管道可能存在难以识别的瓶颈,例如网络或操作系统限制(例如,与加载到内存中的文件相关的操作系统中打开的max number of files)。由于beam的体系结构,我认为即使启用了autoscaling也可以拥有更多节点不是问题,因为我们发现自动缩放会自动选择运行作业所需的适当数量的工作实例。如果您出于其他原因担心计算和指标,请与我们分享

关于第2点,我认为应该在图上找到活动,因为边输入(内存中)正在被处理。但是,如果这对您没有意义,您可以随时添加完整的作业图,以便我们了解管道步骤的任何其他细节

我的建议是添加更多工作人员来分发workaload,因为PCollection是一个分布式数据集,将在可用节点之间分发。您可以尝试使用更多节点使用类似的计算资源,例如,4个实例n2d-highmem-16(16vCPU 128GB)。有了这些变化,任何瓶颈都有可能消失或得到缓解;此外,您还可以使用相同的方式监视新作业:

  • 记住在管道中check errors,这样您就可以确定正在发生/导致性能问题的任何其他问题

  • 检查数据流UI中的CPU and Memory usage。如果在作业级别发生内存错误,Stackdriver应将其显示为memory errors,但也应检查memory in the host instance,以确保其未因其他原因达到操作系统中的限制

  • 您可能希望将this example与边输入一起作为字典进行检查。我不是专家,但您可以遵循示例中的最佳实践

更新

如果机器n2d-highmem-16有OOM,在我看来,每个线束螺纹可能使用dicts的副本。不太确定配置线程数是否有帮助,但您可以尝试在管道选项中设置^{}

另一方面,可以展开步骤Featurize?在这一步中,wall time非常高(~6天),让我们检查一下吸收这种延迟的composite transforms。对于有问题的复合转换,请让我们了解代码片段。要确定可能存在问题的复合转换,请参考Side Inputs Metrics,特别是Time spent writingTime spent reading

相关问题 更多 >

    热门问题