Beam/Python后处理时间触发器不工作?

2024-09-30 06:15:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我用Python编写了一个小Beam管道来计算实时聊天中发送的消息。消息是通过Pub/Sub传递的,它们已经有了我在apachebeam中使用的时间戳。在

我使用10分钟的固定时间窗口,我想每分钟输出一次结果。。。但那没用。我每10分钟有一个输出(窗口大小)。在

events = \
            (p
             | 'ReadPubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | 'DecodeString' >> beam.Map(lambda e: e.decode('utf-8'))
             | 'TransformJsonToDictionary' >> beam.Map(lambda e: json.loads(e))
             | 'ParseEventsFn' >> beam.ParDo(ParseEventsFn())
             | 'AddEventTimestamps' >> beam.Map(
                        lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
             | 'ApplyWindow' >> beam.WindowInto(
                        window.FixedWindows(size=10 * 60),
                        trigger=trigger.Repeatedly(
                            trigger.AfterProcessingTime(delay=1 * 60)
                        ),
                        accumulation_mode=trigger.AccumulationMode.DISCARDING)
             | 'PairWithOne' >> beam.Map(lambda e: (e['channel_id'], 1))
             | beam.CombinePerKey(sum)
             | 'DEBUG' >> beam.ParDo(PrintFn('DEBUG')))

我是不是少了点什么?如何使用事件时间实现类似的目标?在

谢谢


Tags: lambdadebug消息maptopic管道时间window

热门问题