This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 933ca6b  [ZEPPELIN-4886] [flink] Add property to start flink job from 
savepoint
933ca6b is described below

commit 933ca6baa98998a37c14a00161ec23f04d0a80bc
Author: 1721563...@qq.com <a18652727118>
AuthorDate: Fri Jun 26 10:26:46 2020 +0800

    [ZEPPELIN-4886] [flink] Add property to start flink job from savepoint
    
    ### What is this PR for?
    Add property to start flink job from savepoint
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    [ZEPPELIN-4886] https://issues.apache.org/jira/browse/ZEPPELIN-4886
    
    ### How should this be tested?
    GUI integration tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    No Questions.
    
    Author: 1721563...@qq.com <a18652727118>
    
    Closes #3818 from lonelyGhostisdog/ZEPPELIN-4886 and squashes the following 
commits:
    
    6c84b86c8 [1721563...@qq.com] [ZEPPELIN-4886] Remove System.out and add 
test for resume from invalid savepointPath
    c883037fc [1721563...@qq.com] [ZEPPELIN-4886] Add test for resume from 
exist savepointPath
    166ac4942 [1721563...@qq.com] [ZEPPELIN-4886] Add docs for the new property
    d69bd93be [1721563...@qq.com] [ZEPPELIN-4886] Add property to start flink 
job from savepoint.
---
 docs/interpreter/flink.md                          |  5 ++
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  8 +++
 .../flink/FlinkStreamSqlInterpreterTest.java       | 83 ++++++++++++++++++++++
 3 files changed, 96 insertions(+)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 108d7b5..5eb7f5c 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -457,6 +457,11 @@ In this section, we will list and explain all the 
supported local properties in
     <td>If you specify it, then when you cancel your flink job in Zeppelin, it 
would also do savepoint and store state in this directory. And when you resume 
your job, it would resume from this savepoint.</td>
   </tr>
   <tr>
+    <td>savepointPath</td>
+    <td></td>
+    <td>If you specify it, then when you resume your job, it would resume from 
this savepointPath .</td>
+  </tr>
+  <tr>
     <td>runAsOne</td>
     <td>false</td>
     <td>All the insert into sql will run in a single flink job if this is 
true.</td>
diff --git 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 1175b7b..1307108 100644
--- 
a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -647,6 +647,14 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def setSavePointIfNecessary(context: InterpreterContext): Unit = {
     val savepointDir = context.getLocalProperties.get("savepointDir")
+    val savepointPath = context.getLocalProperties.get("savepointPath");
+
+    if (!StringUtils.isBlank(savepointPath)){
+      LOGGER.info("savepointPath has been setup by user , savepointPath = {}", 
savepointPath)
+      configuration.setString("execution.savepoint.path", savepointPath)
+      return
+    }
+
     if (!StringUtils.isBlank(savepointDir)) {
       val savepointPath = z.angular(context.getParagraphId + "_savepointpath", 
context.getNoteId, null)
       if (savepointPath == null) {
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 058bcee..f83ee0a 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -27,6 +27,7 @@ import 
org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.List;
@@ -254,6 +255,88 @@ public class FlinkStreamSqlInterpreterTest extends 
SqlInterpreterTest {
   }
 
   @Test
+  public void testResumeStreamSqlFromExistSavePointPath() throws IOException, 
InterpreterException, InterruptedException, TimeoutException {
+    String initStreamScalaScript = getInitStreamScript(1000);
+    InterpreterResult result = 
flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    File savePointDir = FileUtils.getTempDirectory();
+    final Waiter waiter = new Waiter();
+    Thread thread = new Thread(() -> {
+      try {
+        InterpreterContext context = getInterpreterContext();
+        context.getLocalProperties().put("type", "update");
+        context.getLocalProperties().put("savepointDir", 
savePointDir.getAbsolutePath());
+        context.getLocalProperties().put("parallelism", "1");
+        context.getLocalProperties().put("maxParallelism", "10");
+        InterpreterResult result2 = sqlInterpreter.interpret("select url, 
count(1) as pv from " +
+                "log group by url", context);
+        waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+        waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+      } catch (Exception e) {
+        e.printStackTrace();
+        waiter.fail("Should not fail here");
+      }
+      waiter.resume();
+    });
+    thread.start();
+
+    // the streaming job will run for 20 seconds. check init_stream.scala
+    // sleep 10 seconds to make sure the job is started but not finished
+    Thread.sleep(10 * 1000);
+
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "update");
+    context.getLocalProperties().put("savepointDir", 
savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("parallelism", "2");
+    context.getLocalProperties().put("maxParallelism", "10");
+    sqlInterpreter.cancel(context);
+    waiter.await(10 * 1000);
+
+    // get exist savepoint path from tempDirectory
+    // if dir more than 1 then get first or throw error
+    String[] allSavepointPath = savePointDir.list((dir, name) -> 
name.startsWith("savepoint"));
+    assertTrue(allSavepointPath.length>0);
+
+    String savepointPath = 
savePointDir.getAbsolutePath().concat(File.separator).concat(allSavepointPath[0]);
+
+    // resume job from exist savepointPath
+    context.getLocalProperties().put("savepointPath",savepointPath);
+    sqlInterpreter.interpret("select url, count(1) as pv from " +
+            "log group by url", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, 
resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+
+  }
+
+  @Test
+  public void testResumeStreamSqlFromInvalidSavePointPath() throws 
IOException, InterpreterException, InterruptedException, TimeoutException {
+    String initStreamScalaScript = getInitStreamScript(1000);
+    InterpreterResult result = 
flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    File savepointPath = FileUtils.getTempDirectory();
+    InterpreterContext context = getInterpreterContext();
+    context.getLocalProperties().put("type", "update");
+    context.getLocalProperties().put("savepointPath", 
savepointPath.getAbsolutePath());
+    context.getLocalProperties().put("parallelism", "1");
+    context.getLocalProperties().put("maxParallelism", "10");
+    InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) 
as pv from " +
+            "log group by url", context);
+
+    // due to invalid savepointPath, failed to submit job and throw exception
+    assertEquals(InterpreterResult.Code.ERROR, result2.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    assertTrue(resultMessages.toString().contains("Failed to submit job."));
+
+  }
+
+  @Test
   public void testStreamUDF() throws IOException, InterpreterException {
     String initStreamScalaScript = getInitStreamScript(100);
     InterpreterResult result = 
flinkInterpreter.interpret(initStreamScalaScript,

Reply via email to