This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new d2b6618 [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties d2b6618 is described below commit d2b66188232fd1481b3c1196adf1defe847d07c6 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sat Jun 6 23:35:26 2020 +0800 [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties ### What is this PR for? Minor PR which allow user to specify job name as paragraph local properties, by default it is the sql statement if user don't specify it. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4865 ### How should this be tested? * CI pass and manually tested. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3792 from zjffdu/ZEPPELIN-4865 and squashes the following commits: 7c6e2f5c9 [Jeff Zhang] [ZEPPELIN-4865]. Allow specify jobName as paragraph local properties (cherry picked from commit 6bea350b804586ec5365603da27e62265986608e) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../src/main/java/org/apache/zeppelin/flink/FlinkShims.java | 2 +- .../src/main/java/org/apache/zeppelin/flink/Flink110Shims.java | 4 ++-- .../src/main/java/org/apache/zeppelin/flink/Flink111Shims.java | 2 +- .../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 6 ++++-- .../java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java | 5 +++-- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index ef5f0a0..274bf2c 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -89,7 +89,7 @@ public abstract class FlinkShims { public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception; - public abstract boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception; + public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception; public abstract boolean rowEquals(Object row1, Object row2); diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index dec3560..f6d506a 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -109,8 +109,8 @@ public class Flink110Shims extends FlinkShims { } @Override - public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { - ((TableEnvironment) tblEnv).execute(sql); + public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { + ((TableEnvironment) tblEnv).execute(jobName); return true; } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index ea11ced..2480c69 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -116,7 +116,7 @@ public class Flink111Shims extends FlinkShims { } @Override - public boolean executeMultipleInsertInto(String sql, Object tblEnv, InterpreterContext context) throws Exception { + public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); while(!jobClient.getJobStatus().get().isTerminalState()) { LOGGER.debug("Wait for job to finish"); diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 1e8e803..f2d31b6 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -223,7 +223,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { if (runAsOne) { try { lock.lock(); - if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(st, this.tbenv, context)) { + String jobName = context.getStringLocalProperty("jobName", st); + if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) { context.out.write("Insertion successfully.\n"); } } catch (Exception e) { @@ -532,7 +533,8 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); if (!runAsOne) { this.tbenv.sqlUpdate(sql); - this.tbenv.execute(sql); + String jobName = context.getStringLocalProperty("jobName", sql); + this.tbenv.execute(jobName); context.out.write("Insertion successfully.\n"); } else { flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context); diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 2d98ef7..a8728d3 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -157,8 +157,9 @@ public abstract class AbstractStreamSqlJob { retrievalThread.start(); LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism); - stenv.execute(tableName); - LOGGER.info("Flink Job is finished, tableName: " + tableName); + String jobName = context.getStringLocalProperty("jobName", tableName); + stenv.execute(jobName); + LOGGER.info("Flink Job is finished, jobName: " + jobName); // wait for retrieve thread consume all data LOGGER.info("Waiting for retrieve thread to be done"); retrievalThread.join();