This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new c4e6417  [ZEPPELIN-5284]. savepoint & checkpoint don't work in flink 
1.12
c4e6417 is described below

commit c4e6417f1bcb3049488d81bac42020571aa3dbb8
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Mar 11 11:30:12 2021 +0800

    [ZEPPELIN-5284]. savepoint & checkpoint don't work in flink 1.12
    
    ### What is this PR for?
    
    The root cause of this issue is flink 1.12 use copied version of 
Configuration, so that the previous method of updating configuration doesn't' 
work for flink 1.12. In this PR, I get the configuration via reflection and 
update it directly.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5284
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #4076 from zjffdu/ZEPPELIN-5284 and squashes the following commits:
    
    08426a735 [Jeff Zhang] [ZEPPELIN-5284]. savepoint & checkpoint don't work 
in flink 1.12
    
    (cherry picked from commit 5365d28f870638bb1a1ccfec8da223d2726c6ab8)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../apache/zeppelin/flink/FlinkScalaInterpreter.scala   | 17 +++++++++++++----
 .../zeppelin/flink/FlinkStreamSqlInterpreterTest.java   |  9 ++-------
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 4084f1f..25bfcfc 100644
--- 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -651,9 +651,12 @@ class FlinkScalaInterpreter(val properties: Properties) {
   def setSavepointPathIfNecessary(context: InterpreterContext): Unit = {
     val savepointPath = 
context.getConfig.getOrDefault(JobManager.SAVEPOINT_PATH, "").toString
     val resumeFromSavepoint = 
context.getBooleanLocalProperty(JobManager.RESUME_FROM_SAVEPOINT, false)
+    // flink 1.12 use copied version of configuration, so in order to update 
configuration we have to
+    // get the internal configuration of StreamExecutionEnvironment.
+    val internalConfiguration = getConfigurationOfStreamExecutionEnv()
     if (!StringUtils.isBlank(savepointPath) && resumeFromSavepoint){
       LOGGER.info("Resume job from savepoint , savepointPath = {}", 
savepointPath)
-      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
savepointPath)
+      
internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
savepointPath);
       return
     }
 
@@ -661,7 +664,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val resumeFromLatestCheckpoint = 
context.getBooleanLocalProperty(JobManager.RESUME_FROM_CHECKPOINT, false)
     if (!StringUtils.isBlank(checkpointPath) && resumeFromLatestCheckpoint) {
       LOGGER.info("Resume job from checkpoint , checkpointPath = {}", 
checkpointPath)
-      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
checkpointPath)
+      
internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
checkpointPath);
       return
     }
 
@@ -669,14 +672,14 @@ class FlinkScalaInterpreter(val properties: Properties) {
       SavepointConfigOptions.SAVEPOINT_PATH.key(), "")
     if (!StringUtils.isBlank(userSavepointPath)) {
       LOGGER.info("Resume job from user set savepoint , savepointPath = {}", 
userSavepointPath)
-      configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
checkpointPath)
+      
internalConfiguration.setString(SavepointConfigOptions.SAVEPOINT_PATH.key(), 
checkpointPath)
       return;
     }
 
     val userSettingSavepointPath = 
properties.getProperty(SavepointConfigOptions.SAVEPOINT_PATH.key())
     if (StringUtils.isBlank(userSettingSavepointPath)) {
       // remove SAVEPOINT_PATH when user didn't set it via %flink.conf
-      configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
+      internalConfiguration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH)
     }
   }
 
@@ -887,6 +890,12 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val pattern(prefix, remaining) = webURL
     yarnAddress + remaining
   }
+
+  private def getConfigurationOfStreamExecutionEnv(): Configuration = {
+    val getConfigurationMethod = 
classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration")
+    getConfigurationMethod.setAccessible(true)
+    
getConfigurationMethod.invoke(this.senv.getJavaEnv).asInstanceOf[Configuration]
+  }
 }
 
 
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index fc7c206..e392816 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -331,13 +331,8 @@ public class FlinkStreamSqlInterpreterTest extends 
SqlInterpreterTest {
     result = sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
 
-    if 
(flinkInterpreter.getFlinkVersion().olderThan(FlinkVersion.fromVersionString("1.12.0")))
 {
-      assertEquals(InterpreterResult.Code.ERROR, result.code());
-      assertTrue(context.out.toString(), 
context.out.toString().contains("Cannot find checkpoint or savepoint"));
-    } else {
-      // flink 1.12 would start from scratch if save point is not found.
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    }
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(context.out.toString(), context.out.toString().contains("Cannot 
find checkpoint or savepoint"));
   }
 
   @Test

Reply via email to