This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 3dff78c [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged 3dff78c is described below commit 3dff78c87ffbfe6575afb4642b54e2cc14cc4aa1 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon May 17 22:03:21 2021 +0800 [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged ### What is this PR for? Simple PR to check whether the current latest checkpoint is changed, only update paragraph config when it is changed. ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5374 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4120 from zjffdu/ZEPPELIN-5374 and squashes the following commits: 79f269f24e [Jeff Zhang] [ZEPPELIN-5374] Don't update paragraph config when latest checkpoint of flink is unchanged (cherry picked from commit b173590a62144bfa20d4822b99283677edd61c16) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../src/main/java/org/apache/zeppelin/flink/JobManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java index 5f40569..7242b04 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -188,6 +188,7 @@ public class JobManager { private AtomicBoolean running = new AtomicBoolean(true); private boolean isFirstPoll = true; private long checkInterval; + private String latestCheckpointPath; FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context, long checkInterval) { this.flinkWebUrl = flinkWebUrl; @@ -253,11 +254,12 @@ public class JobManager { if (completedObject.has("external_path")) { String checkpointPath = completedObject.getString("external_path"); LOGGER.debug("Latest checkpoint path: {}", checkpointPath); - if (!StringUtils.isBlank(checkpointPath)) { + if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)) { Map<String, String> config = new HashMap<>(); config.put(LATEST_CHECKPOINT_PATH, checkpointPath); context.getIntpEventClient().updateParagraphConfig( context.getNoteId(), context.getParagraphId(), config); + latestCheckpointPath = checkpointPath; } } }