我有一个数据流管道处理大约1Gb的数据输入,其中两个DICT作为side_输入。目标是在这两个输入的帮助下,从主数据集计算特征
管道总体结构如下:
# First side input, ends up as a 2GB dict with 3.5 million keys
side_inp1 = ( p |
"read side_input1" >> beam.io.ReadFromAvro("$PATH/*.avro") |
"to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
)
# Second side input, ends up as a 1.6GB dict with 4.5 million keys
side_inp2 = (p |
"read side_input2" >> beam.io.ReadFromAvro("$PATH2/*.avro") |
"to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
)
# The main part of the pipeline, reading an avro dataset of 1 million rows -- 20GB
(p |
"read inputs" >> beam.io.ReadFromAvro("$MainPath/*.avro") |
"main func" >> beam.Map(MyMapper, pvalue.AsDict(side_inp1), pvalue.AsDict(side_inp2))
)
以下是数据流图:
Featureize是一个在输入端查找ID的函数,它查找向量,并使用180种不同的向量点积方法来计算某些特征。这是一个完全受CPU限制的进程,预计需要比管道中的其他部分更长的时间,但这里的奇怪之处在于暂停
我的问题有两个:
MyMapper
步骤的吞吐量图,我想知道吞吐量是否在下降(从~400行/秒下降到最后的~1行/秒)李>其他作业配置
额外观察
我经常在LogViewer中看到如下日志实体:[INFO] Completed workitem: 4867151000103436312 in 1069.056863785 seconds"
到目前为止,所有完成的工作项都花费了大约1000-1100秒,这是另一个令人困惑的原因,为什么在处理工作项花费与以前相同的时间时吞吐量会下降?并行性是否因某种原因而下降?(可能是一些我无法控制的隐藏线程阈值,比如线束线程?)
在管道的后面部分,查看日志,执行模式看起来非常有序(似乎它正在执行1个工作项,完成它,转到下一个工作项,考虑到有1TB的可用内存和40个内核,这对我来说很奇怪)
有0个错误甚至警告
第1点中的吞吐量图是一个很好的指标,表明您的工作绩效有所下降
侧输入拟为in memory;但是,我不太确定只有1个highmem节点的管道是否是一种好方法。由于只有一个节点,管道可能存在难以识别的瓶颈,例如网络或操作系统限制(例如,与加载到内存中的文件相关的操作系统中打开的max number of files)。由于beam的体系结构,我认为即使启用了autoscaling也可以拥有更多节点不是问题,因为我们发现自动缩放会自动选择运行作业所需的适当数量的工作实例。如果您出于其他原因担心计算和指标,请与我们分享
关于第2点,我认为应该在图上找到活动,因为边输入(内存中)正在被处理。但是,如果这对您没有意义,您可以随时添加完整的作业图,以便我们了解管道步骤的任何其他细节
我的建议是添加更多工作人员来分发workaload,因为PCollection是一个分布式数据集,将在可用节点之间分发。您可以尝试使用更多节点使用类似的计算资源,例如,4个实例n2d-highmem-16(16vCPU 128GB)。有了这些变化,任何瓶颈都有可能消失或得到缓解;此外,您还可以使用相同的方式监视新作业:
记住在管道中check errors,这样您就可以确定正在发生/导致性能问题的任何其他问题
检查数据流UI中的CPU and Memory usage。如果在作业级别发生内存错误,Stackdriver应将其显示为memory errors,但也应检查memory in the host instance,以确保其未因其他原因达到操作系统中的限制
您可能希望将this example与边输入一起作为字典进行检查。我不是专家,但您可以遵循示例中的最佳实践
更新
如果机器n2d-highmem-16有OOM,在我看来,每个线束螺纹可能使用dicts的副本。不太确定配置线程数是否有帮助,但您可以尝试在管道选项中设置^{}
另一方面,可以展开步骤
Featurize
?在这一步中,wall time非常高(~6天),让我们检查一下吸收这种延迟的composite transforms。对于有问题的复合转换,请让我们了解代码片段。要确定可能存在问题的复合转换,请参考Side Inputs Metrics,特别是Time spent writing
和Time spent reading
相关问题 更多 >
编程相关推荐