未更新侧面输入数据

2024-05-04 21:29:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用动态配置数据构建一个管道,该数据在触发时会得到更新

共有2个子主题,主题A为物联网数据,主题B为将用于转换物联网数据的配置

配置保存在Cloud Firestore中。当数据库更新时,云函数将读取更新后的配置并将其发送给PubSub-topic B

问题在于数据流作业仅在作业开始时读取配置数据,并且永远不会更新

我如何才能更新侧面输入

p = beam.Pipeline(options=options)

class Transform(beam.DoFn):
    def process(self, configuration):
        ...
        yield output

def run():
    ...
    iot_data = (p
        | 'ReadIotData' >> ReadFromPubSub(TOPIC_A)

    configuration = (p
        | 'ReadConfig' >> ReadFromPubSub(TOPIC_B)
        | 'WindowUserData' >> beam.WindowInto(
            window.GlobalWindows(),
            trigger=trigger.Repeatedly(trigger.AfterCount(1)),
            accumulation_mode=trigger.AccumulationMode.DISCARDING)
        | 'JsonLoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\\','')))

    output = (iot_data
        | 'transform' >> beam.ParDo(Transform(),
            beam.pvalue.AsDict(configuration))
        | 'Output' >> WriteToPubSub(TOPIC_C)

Tags: 数据主题outputdatatopicdef作业transform