This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new c4e6417 [ZEPPELIN-5284]. savepoint & checkpoint don't work in flink 1.12 c4e6417 is described below commit c4e6417f1bcb3049488d81bac42020571aa3dbb8 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Mar 11 11:30:12 2021 +0800 [ZEPPELIN-5284]. savepoint & checkpoint don't work in flink 1.12 ### What is this PR for? The root cause of this issue is flink 1.12 use copied version of Configuration, so that the previous method of updating configuration doesn't' work for flink 1.12. In this PR, I get the configuration via reflection and update it directly. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5284 ### How should this be tested? * Manually tested ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4076 from zjffdu/ZEPPELIN-5284 and squashes the following commits: 08426a735 [Jeff Zhang] [ZEPPELIN-5284]. savepoint & checkpoint don't work in flink 1.12 (cherry picked from commit 5365d28f870638bb1a1ccfec8da223d2726c6ab8) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../apache/zeppelin/flink/FlinkScalaInterpreter.scala | 17 +++++++++++++---- .../zeppelin/flink/FlinkStreamSqlInterpreterTest.java | 9 ++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 4084f1f..25bfcfc 100644 --- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -651,9 +651,12 @@ class FlinkScalaInterpreter(val properties: Properties) { def setSavepointPathIfNecessary(context: InterpreterContext): Unit = { val savepointPath = context.getConfig.getOrDefault(JobManager.SAVEPOINT_PATH, "").toString val resumeFromSavepoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_SAVEPOINT, false) + // flink 1.12 use copied version of configuration, so in order to update configuration we have to + // get the internal configuration of StreamExecutionEnvironment. + val internalConfiguration = getConfigurationOfStreamExecutionEnv() if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint){ LOGGER.info("Resume job from savepoint , savepointPath = {}", savepointPath) - configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), savepointPath) + internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), savepointPath); return } @@ -661,7 +664,7 @@ class FlinkScalaInterpreter(val properties: Properties) { val resumeFromLatestCheckpoint = context.getBooleanLocalProperty(JobManager.RESUME_FROM_CHECKPOINT, false) if (!StringUtils.isBlank(checkpointPath) && resumeFromLatestCheckpoint) { LOGGER.info("Resume job from checkpoint , checkpointPath = {}", checkpointPath) - configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath) + internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath); return } @@ -669,14 +672,14 @@ class FlinkScalaInterpreter(val properties: Properties) { SavepointConfigOptions.SAVEPOINT_PATH.key(), "") if (!StringUtils.isBlank(userSavepointPath)) { LOGGER.info("Resume job from user set savepoint , savepointPath = {}", userSavepointPath) - configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath) + internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), checkpointPath) return; } val userSettingSavepointPath = properties.getProperty(SavepointConfigOptions.SAVEPOINT_PATH.key()) if (StringUtils.isBlank(userSettingSavepointPath)) { // remove SAVEPOINT_PATH when user didn't set it via %flink.conf - configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH) + internalConfiguration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH) } } @@ -887,6 +890,12 @@ class FlinkScalaInterpreter(val properties: Properties) { val pattern(prefix, remaining) = webURL yarnAddress + remaining } + + private def getConfigurationOfStreamExecutionEnv(): Configuration = { + val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration") + getConfigurationMethod.setAccessible(true) + getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration] + } } diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index fc7c206..e392816 100644 --- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -331,13 +331,8 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { result = sqlInterpreter.interpret("select url, count(1) as pv from " + "log group by url", context); - if (flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.12.0"))) { - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint")); - } else { - // flink 1.12 would start from scratch if save point is not found. - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(context.out.toString(), context.out.toString().contains("Cannot find checkpoint or savepoint")); } @Test