我试图在google云数据流上运行一个有状态聚合DoFn,它在其能力矩阵中引用有状态DoFn,但是我得到以下错误:
Exception: Requested execution of a stateful DoFn, but no user state context is available. This likely means that the current runner does not support the execution of stateful DoFns.
前面的错误发生在这里:
@with_input_types(Dict[K, V])
@with_output_types(Dict[K, V])
class StatefulCombineDoFn(beam.DoFn):
BUFFER = BagStateSpec(
'buffer',
PickleCoder()
)
STATE = CombiningValueStateSpec(
'state',
PickleCoder(),
CombineFn()
)
EXPIRY_TIMER = TimerSpec(
'expiry',
TimeDomain.WATERMARK
)
def process(
self,
element,
w=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER),
state=beam.DoFn.StateParam(STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)
):
expiry_timer.set(w.end+self.allowed_lateness)
buffer.add(event)
state.add(event)
@on_timer(EXPIRY_TIMER)
def expiry(
self,
state=beam.DoFn.StateParam(STATE),
buffer=beam.DoFn.StateParam(BUFFER)
):
events = buffer.read()
info = state.read()
yield [(info, events)]
如何绕过这个?你知道吗
不幸的是,数据流运行程序当前不支持用户状态和计时器。我会更新这个答案一旦它做。你知道吗
目前,支持这一点的跑步者有:便携式Flink runner、Direct runner。你知道吗
相关问题 更多 >
编程相关推荐