我有一个数据流作业,它通过GCP Pub/Sub累积UI交互。我已经使用一个脚本测试了这一点,该脚本将表示交互的许多Pub/Sub消息发送到输入主题。当我执行较少的消息数(<;500/秒)时,数据流作业会正确计算交互。但当我增加消息数量时,数据流作业发送的计数突然比发送到输入主题的发布/订阅消息数量高出很多(5-10倍)
我探讨的想法是:
这没有意义,因为输入主题订阅的确认截止时间是1分钟
我的触发器配置有问题
ReadFromPubSub或CombineGlobaly(CountFn())中发生了一些我不理解的事情
class CountFn(beam.CombineFn):
def create_accumulator(self):
# interaction1, interaction2, interaction3, interaction4
return 0, 0, 0, 0
def add_input(self, interactions, input):
(interaction1, interaction2, interaction3, interaction4) = interactions
interaction1_result = interaction1 + input['interaction1'] if ('interaction1' in input and isinstance(input['interaction1'], int) and input['interaction1'] > 0) else interaction1
interaction2_result = interaction2 + input['interaction2'] if ('interaction2' in input and isinstance(input['interaction2'], int) and input['interaction2'] > 0) else interaction2
interaction3_result = interaction3 + input['interaction3'] if ('interaction3' in input and isinstance(input['interaction3'], int) and input['interaction3'] > 0) else interaction3
interaction4_result = interaction4 + input['interaction4'] if ('interaction4' in input and isinstance(input['interaction4'], int) and input['interaction4'] > 0) else interaction4
return interaction1_result, interaction2_result, interaction3_result, interaction4_result
def merge_accumulators(self, accumulators):
interaction1, interaction2, interaction3, interaction4 = zip(*accumulators)
return sum(interaction1), sum(interaction2), sum(interaction3), sum(interaction4)
def extract_output(self, interactions):
(interaction1, interaction2, interaction3, interaction4) = interactions
output = {
'interaction1': interaction1,
'interaction2': interaction2,
'interaction3': interaction3,
'interaction4': interaction4
}
return output
def to_json(e):
try:
return json.loads(e.decode('utf-8'))
except json.JSONDecodeError:
return {}
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read from pubsub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
| 'To Json' >> beam.Map(to_json)
| 'Window' >> beam.WindowInto(window.FixedWindows(1),
trigger=AfterProcessingTime(delay=1 * 3),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=2)
| 'Calculate Metrics' >> beam.CombineGlobally(CountFn()).without_defaults()
| 'To bytestring' >> beam.Map(lambda e: json.dumps(e).encode('utf-8'))
| 'Write to pubsub' >> beam.io.WriteToPubSub(topic=known_args.output_topic))
目前没有回答
相关问题 更多 >
编程相关推荐