Apache Beam最小值、最大值和平均值

2024-10-01 07:17:31 发布

您现在位置:Python中文网/ 问答频道 /正文

根据这个link,Guillem Xercavins为compute minimum和maximum编写了一个自定义类。在

class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max

我还需要计算平均值,我发现示例代码如下:

^{pr2}$

你知道如何将平均值方法合并到MinMax中,这样我就只有一个类能够计算最小值、最大值和平均值,并生成一组键和值-3个值的数组吗?在


Tags: selfinputreturndeflinkcurrentminmax
1条回答
网友
1楼 · 发布于 2024-10-01 07:17:31

这是组合类的解决方案,加上中值

import numpy as np

class MinMaxMeanFn(beam.CombineFn):

    def create_accumulator(self):
        # sum, min, max, count, median
        return (0.0, 999999999.0, 0.0, 0, [])

    def add_input(self, cur_data, input):
        (cur_sum, cur_min, cur_max, count, cur_median) = cur_data
        if type(input) == list:
            cur_count = len(input)
            sum_input = sum(input)
            min_input = min(input)
            max_input = max(input)
        else:
            sum_input = input
            cur_count = 1
        return cur_sum + sum_input, min(min_input, cur_min), max(max_input, cur_max), count + cur_count, cur_median + input

    def merge_accumulators(self, accumulators):
        sums, mins, maxs, counts, medians = zip(*accumulators)
        return sum(sums), min(mins), max(maxs), sum(counts), medians

    def extract_output(self, cur_data):
        (sum, min, max, count, medians) = cur_data
        avg = sum / count if count else float('NaN')
        med = np.median(medians)
        return  {
            "max": max,
            "min": min,
            "avg": avg,
            "count": count,
            "median": med
        }

用法示例:

^{pr2}$

*我没有测试CombinePerKey是否可以在没有GroupByKey的情况下工作,请随时测试。在

相关问题 更多 >