Apache Beam会话窗口不返回任何内容

2024-10-01 02:25:13 发布

您现在位置:Python中文网/ 问答频道 /正文

目前,我正在尝试计算电子商务购物车中剩余的总金额。我每次都有会话id和产品价格。然后我创建了一个延迟15秒的会话窗口,最后是一个CombinePerKey,将所有延迟相加。但是,在创建窗口之后,不会生成任何输出(使用beam.Map(print)不会打印任何内容)。代码运行时没有任何错误,我不知道该怎么办

以下是我的数据示例:

('2b88b00a-892a-4639-bce4-1ea17a7d6221', 2.54)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 3.97)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 4.29)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)

这是我的管道:

pipe = beam.Pipeline(options=options)

def encode_byte_string(element):
   print(element)
   element = str(element)
   return element.encode('utf-8')

ecommerce_data = (
        pipe
        | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
        | "Decode utf-8" >> beam.Map(lambda row: row.decode('utf-8'))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))
        | "Combine per key - sum" >> beam.CombinePerKey(sum)
        | beam.Map(print)
        | "Encode to byte string" >> beam.Map(encode_byte_string)
        | "Write to output" >> beam.io.WriteToPubSub(topic=output)
    )

Tags: iomapstringelementbyteutfsubscriptionencode
1条回答
网友
1楼 · 发布于 2024-10-01 02:25:13

假设您共享了完整的代码,这是因为没有密钥(或者设置不正确)。注意row.decode('utf-8')返回一种字符串类型。要修复此问题,请将字符串转换为元组,如下所示:

ecommerce_data = (
        pipe
        | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
        | "Decode utf-8" >> beam.Map(lambda row: tuple(row.decode('utf-8')))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))
        | "Combine per key - sum" >> beam.CombinePerKey(sum)
        | beam.Map(print)
        | "Encode to byte string" >> beam.Map(encode_byte_string)
        | "Write to output" >> beam.io.WriteToPubSub(topic=output)
    )

(如果您以编码的JSON形式接收发布/订阅数据,则可以添加如下键):

        | "Decode utf-8" >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
        | "Add key" >> beam.Map(lambda row: (row["id"], row["seconds"]))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))

相关问题 更多 >