带有GCP发布/订阅计数器的Apache Beam Python数据流计数过多

2024-09-29 19:18:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据流作业,它通过GCP Pub/Sub累积UI交互。我已经使用一个脚本测试了这一点,该脚本将表示交互的许多Pub/Sub消息发送到输入主题。当我执行较少的消息数(<;500/秒)时,数据流作业会正确计算交互。但当我增加消息数量时,数据流作业发送的计数突然比发送到输入主题的发布/订阅消息数量高出很多(5-10倍)

我探讨的想法是:

  1. 发布/订阅正在重新发送未确认的邮件

这没有意义,因为输入主题订阅的确认截止时间是1分钟

  1. 我的触发器配置有问题

  2. ReadFromPubSub或CombineGlobaly(CountFn())中发生了一些我不理解的事情

class CountFn(beam.CombineFn):
        def create_accumulator(self):
            # interaction1, interaction2, interaction3, interaction4
            return 0, 0, 0, 0

        def add_input(self, interactions, input):
            (interaction1, interaction2, interaction3, interaction4) = interactions
            interaction1_result = interaction1 + input['interaction1'] if ('interaction1' in input and isinstance(input['interaction1'], int) and input['interaction1'] > 0) else interaction1
            interaction2_result = interaction2 + input['interaction2'] if ('interaction2' in input and isinstance(input['interaction2'], int) and input['interaction2'] > 0) else interaction2
            interaction3_result = interaction3 + input['interaction3'] if ('interaction3' in input and isinstance(input['interaction3'], int) and input['interaction3'] > 0) else interaction3
            interaction4_result = interaction4 + input['interaction4'] if ('interaction4' in input and isinstance(input['interaction4'], int) and input['interaction4'] > 0) else interaction4
            return interaction1_result, interaction2_result, interaction3_result, interaction4_result

        def merge_accumulators(self, accumulators):
            interaction1, interaction2, interaction3, interaction4 = zip(*accumulators)
            return sum(interaction1), sum(interaction2), sum(interaction3), sum(interaction4)

        def extract_output(self, interactions):
            (interaction1, interaction2, interaction3, interaction4) = interactions

            output = {
                'interaction1': interaction1,
                'interaction2': interaction2,
                'interaction3': interaction3,
                'interaction4': interaction4
            }
            return output

    def to_json(e):
        try:
            return json.loads(e.decode('utf-8'))
        except json.JSONDecodeError:
            return {}

with beam.Pipeline(options=pipeline_options) as p:
        (p
         | 'Read from pubsub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
         | 'To Json' >> beam.Map(to_json)
         | 'Window' >> beam.WindowInto(window.FixedWindows(1),
                                       trigger=AfterProcessingTime(delay=1 * 3),
                                       accumulation_mode=AccumulationMode.DISCARDING,
                                       allowed_lateness=2)
         | 'Calculate Metrics' >> beam.CombineGlobally(CountFn()).without_defaults()
         | 'To bytestring' >> beam.Map(lambda e: json.dumps(e).encode('utf-8'))
         | 'Write to pubsub' >> beam.io.WriteToPubSub(topic=known_args.output_topic))

Tags: andselfjson消息inputreturnifdef

热门问题