在Python中的窗口apache beam作业上添加异常处理

2024-09-29 23:14:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在apache beam进程中添加一些异常处理

它运行在谷歌云数据流的流媒体作业上

这是一个简单的基于python的窗口滑动,可以读取json对象,将其分组并发送回

它已经成功运行了几个月,没有任何调整,但几天前就被卡住了

流媒体作业显然会因为未处理的异常而暂停,我想这就是正在发生的情况,可能是因为一个损坏的记录

我想我应该在日志中添加一些异常处理,然后丢弃损坏的记录,然后进行调查

但是,我不知道如何添加异常处理 代码是:

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message.
    """
    def __init__(self, window_size):
        self.window_size = int(window_size.get() * 60)
    def expand(self, pcoll):
        return (pcoll |
                "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size)) |
                "Add classname Key" >> beam.Map(lambda elem: (json.loads(ast.literal_eval(elem.decode("utf-8")))['dw_classname'], elem)) |
                "Groupby" >> beam.GroupByKey())

我不确定哪一部分失败了,在整个过程中添加一个简单的try/except不会起作用,因为它似乎会产生一个Java错误,我无法简单地捕捉到这个错误。我还想知道是哪些因素造成了问题

下面是我试图捕捉的java回溯:

> Error message from worker: java.lang.RuntimeException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: call already cancelled
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:107)
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:120)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: call already cancelled
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:100)
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
> org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:203)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:107)
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
> org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:95)
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:120)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)

代码相当简单,但我不知道如何捕获异常,我对ApacheBeam非常陌生,如果有人有见解,我将非常感激


Tags: orgcoregrpcapacheutiljavacommonbeam

热门问题