有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java是否有任何方法可以确保在Flink on job cancel with savepoint上通知所有检查点侦听器检查点完成?

我正在使用flink 1.9和REST API /jobs/:jobid/savepoints触发保存点并取消作业(优雅地停止作业,以便稍后从保存点运行)

我在源代码中使用了两阶段提交函数,因此我的源代码实现了CheckpointedFunctionCheckpointListener接口。在snapshotState()方法调用时,我将内部状态和notifyCheckpointComplete()检查点状态快照到第三方系统

从源代码中可以看出,只有snapshotState()部分在CheckpointCoordinator中是同步的-

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

检查点确认和完成通知在^{中是异步的

也就是说,当savepointcancel-job设置为truesavepoint被触发时,在拍摄快照之后,一些任务管理器会在作业取消和执行notifyCheckpointComplete()之前收到完成通知,而一些任务管理器则不会

问题是,是否有一种方法可以使用保存点取消作业,从而保证在作业取消之前所有任务管理器都会调用notifyCheckpointComplete(),或者目前没有方法实现这一点


共 (2) 个答案

  1. # 2 楼答案

    我已经有一段时间没看Flink 1.9了,所以请谨慎对待我的回答

    我猜你的消息来源取消得太早了。因此notifyCheckpointComplete实际上被发送到所有任务,但是一些SourceFunction已经退出run,相应的任务被清理

    好吧,如果你在收到最后一个notifyCheckpointComplete之前忽略取消和中断,你所描述的应该是可能的

    class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
        private volatile boolean canceled = false;
        private volatile boolean pendingCheckpoint = false;
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            pendingCheckpoint = true;
            // start two-phase commit
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
    
        }
    
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            // finish two-phase commit
            pendingCheckpoint = false;
        }
    
        @Override
        public void run(SourceContext<Object> ctx) throws Exception {
            while (!canceled) {
                // do normal source stuff
            }
            // keep the task running after cancellation
            while (pendingCheckpoint) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    // ignore interruptions until two-phase commit is done
                }
            }
        }
    
        @Override
        public void cancel() {
            canceled = true;
        }
    }