Apache Beam streaming管道中的侧面输入产生错误“NoneType没有属性max_timestamp”

2024-06-29 00:22:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个10秒固定窗口的流媒体管道窗口。 当我到达代码的这一部分时:

flatten = ((input_source1, input_source2, input_source3, input_source4) | beam.Flatten()
           | beam.Filter(filter_existing, existing=beam.pvalue.AsList(existing_timeline_input))
          )

我得到一个运行时错误“AttributeError:'非类型'对象没有属性'max\u timestamp'”

在map_的第65行/apache_beam/transforms/sideinputs.py中通过_end生成 window.WindowFn.AssignContext(source\u window.max\u timestamp())[-1]

如果我在过滤器前添加一个洗牌,如下所示:

flatten = ((input_source1, input_source2, input_source3, input_source4) | beam.Flatten()
           | beam.Reshuffle()
           | beam.Filter(filter_existing, existing=beam.pvalue.AsList(existing_timeline_input))
          )

然后它工作,我没有得到错误。 你能解释一下这是怎么可能的吗

谢谢大家!

编辑:

有些输入源是无的

  • 如果删除“无”输入源,则不会出现错误
  • 如果在展平后打印生成的Pcollection,则看不到任何None元素
  • 如果在过滤器转换中使用常量而不是AsList端输入,则不会得到任何错误

编辑2:

以下是代码的其余部分:

    start = (p | beam.io.ReadFromPubSub(topic=topic)
             | "decoding" >> beam.Map(lambda x: json.loads(x.decode()))
             | 'With timestamps' >> beam.Map(lambda user: beam.window.TimestampedValue(user, time.time()))
             | 'process User' >> beam.ParDo(
                processors.UserProcessor(project=project, region=region))
             | 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
             )


input_source1 = (start| beam.ParDo(collectors.GeneralCollector(param)))

input_source2 = (start
                 | beam.ParDo(
            collectors.PersonalCollector(param)))

input_source3 = (start
                 | beam.ParDo(collectors.MyCollector()))

input_source4 = (start
                 | beam.ParDo(
            collectors.AuthoredCollector(project=project)))

existing_input = (start
                 | beam.ParDo(
            processors.ExistingEntriesProcessor(project=project, region=region))
                 | beam.Map(lambda x: x.get('id'))
                 )

Tags: projectinput错误windowstartregionbeamexisting