我有一个10秒固定窗口的流媒体管道窗口。 当我到达代码的这一部分时:
flatten = ((input_source1, input_source2, input_source3, input_source4) | beam.Flatten()
| beam.Filter(filter_existing, existing=beam.pvalue.AsList(existing_timeline_input))
)
我得到一个运行时错误“AttributeError:'非类型'对象没有属性'max\u timestamp'”
在map_的第65行/apache_beam/transforms/sideinputs.py中通过_end生成 window.WindowFn.AssignContext(source\u window.max\u timestamp())[-1]
如果我在过滤器前添加一个洗牌,如下所示:
flatten = ((input_source1, input_source2, input_source3, input_source4) | beam.Flatten()
| beam.Reshuffle()
| beam.Filter(filter_existing, existing=beam.pvalue.AsList(existing_timeline_input))
)
然后它工作,我没有得到错误。 你能解释一下这是怎么可能的吗
谢谢大家!
编辑:
有些输入源是无的
编辑2:
以下是代码的其余部分:
start = (p | beam.io.ReadFromPubSub(topic=topic)
| "decoding" >> beam.Map(lambda x: json.loads(x.decode()))
| 'With timestamps' >> beam.Map(lambda user: beam.window.TimestampedValue(user, time.time()))
| 'process User' >> beam.ParDo(
processors.UserProcessor(project=project, region=region))
| 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
)
input_source1 = (start| beam.ParDo(collectors.GeneralCollector(param)))
input_source2 = (start
| beam.ParDo(
collectors.PersonalCollector(param)))
input_source3 = (start
| beam.ParDo(collectors.MyCollector()))
input_source4 = (start
| beam.ParDo(
collectors.AuthoredCollector(project=project)))
existing_input = (start
| beam.ParDo(
processors.ExistingEntriesProcessor(project=project, region=region))
| beam.Map(lambda x: x.get('id'))
)
目前没有回答
相关问题 更多 >
编程相关推荐