目前,我正在尝试计算电子商务购物车中剩余的总金额。我每次都有会话id和产品价格。然后我创建了一个延迟15秒的会话窗口,最后是一个CombinePerKey,将所有延迟相加。但是,在创建窗口之后,不会生成任何输出(使用beam.Map(print)
不会打印任何内容)。代码运行时没有任何错误,我不知道该怎么办
以下是我的数据示例:
('2b88b00a-892a-4639-bce4-1ea17a7d6221', 2.54)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 3.97)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 4.29)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
这是我的管道:
pipe = beam.Pipeline(options=options)
def encode_byte_string(element):
print(element)
element = str(element)
return element.encode('utf-8')
ecommerce_data = (
pipe
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode utf-8" >> beam.Map(lambda row: row.decode('utf-8'))
| "Window" >> beam.WindowInto(beam.window.Sessions(15))
| "Combine per key - sum" >> beam.CombinePerKey(sum)
| beam.Map(print)
| "Encode to byte string" >> beam.Map(encode_byte_string)
| "Write to output" >> beam.io.WriteToPubSub(topic=output)
)
假设您共享了完整的代码,这是因为没有密钥(或者设置不正确)。注意
row.decode('utf-8')
返回一种字符串类型。要修复此问题,请将字符串转换为元组,如下所示:(如果您以编码的JSON形式接收发布/订阅数据,则可以添加如下键):
相关问题 更多 >
编程相关推荐