我试图在apache beam进程中添加一些异常处理
它运行在谷歌云数据流的流媒体作业上
这是一个简单的基于python的窗口滑动,可以读取json对象,将其分组并发送回
它已经成功运行了几个月,没有任何调整,但几天前就被卡住了
流媒体作业显然会因为未处理的异常而暂停,我想这就是正在发生的情况,可能是因为一个损坏的记录
我想我应该在日志中添加一些异常处理,然后丢弃损坏的记录,然后进行调查
但是,我不知道如何添加异常处理 代码是:
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message.
"""
def __init__(self, window_size):
self.window_size = int(window_size.get() * 60)
def expand(self, pcoll):
return (pcoll |
"Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size)) |
"Add classname Key" >> beam.Map(lambda elem: (json.loads(ast.literal_eval(elem.decode("utf-8")))['dw_classname'], elem)) |
"Groupby" >> beam.GroupByKey())
我不确定哪一部分失败了,在整个过程中添加一个简单的try/except不会起作用,因为它似乎会产生一个Java错误,我无法简单地捕捉到这个错误。我还想知道是哪些因素造成了问题
下面是我试图捕捉的java回溯:
> Error message from worker: java.lang.RuntimeException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: call already cancelled
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:107)
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:120)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: call already cancelled
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:100)
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
> org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:203)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:107)
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:120)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
代码相当简单,但我不知道如何捕获异常,我对ApacheBeam非常陌生,如果有人有见解,我将非常感激
目前没有回答
相关问题 更多 >
编程相关推荐