谷歌云数据流中没有可用的用户状态上下文?

2024-10-04 03:24:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在google云数据流上运行一个有状态聚合DoFn,它在其能力矩阵中引用有状态DoFn,但是我得到以下错误:

Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.

前面的错误发生在这里:

@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):

    BUFFER = BagStateSpec(
        'buffer', 
        PickleCoder()
    )

    STATE = CombiningValueStateSpec(
        'state', 
        PickleCoder(), 
        CombineFn()
    )

    EXPIRY_TIMER = TimerSpec(
        'expiry', 
        TimeDomain.WATERMARK
    )

    def process(
            self, 
            element,
            w=beam.DoFn.WindowParam,
            buffer=beam.DoFn.StateParam(BUFFER),
            state=beam.DoFn.StateParam(STATE),
            expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
    ):

                expiry_timer.set(w.end+self.allowed_lateness)
                buffer.add(event)
                state.add(event)

    @on_timer(EXPIRY_TIMER)
    def expiry(
        self,
        state=beam.DoFn.StateParam(STATE),
        buffer=beam.DoFn.StateParam(BUFFER)
    ):

            events = buffer.read()
            info = state.read()

            yield [(info, events)]

如何绕过这个?你知道吗


Tags: self状态buffer错误beamstateexecutiontimer
1条回答
网友
1楼 · 发布于 2024-10-04 03:24:14

不幸的是,数据流运行程序当前不支持用户状态和计时器。我会更新这个答案一旦它做。你知道吗

目前,支持这一点的跑步者有:便携式Flink runner、Direct runner。你知道吗

相关问题 更多 >