This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 64865b7 [ZEPPELIN-5280]. Use update as the default type of %flink.ssql 64865b7 is described below commit 64865b7da663aaef98d8b9586e2accd1e01c082b Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sat Mar 6 18:32:25 2021 +0800 [ZEPPELIN-5280]. Use update as the default type of %flink.ssql ### What is this PR for? Trivial PR to make update as the default type of `%flink.ssql`, so that user don't need to specify type in most of cases. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5280 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #4181 from zjffdu/ZEPPELIN-5280 and squashes the following commits: e473fb8ed3 [Jeff Zhang] [ZEPPELIN-5280]. Use update as the default type of %flink.ssql --- .../org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java | 5 +---- .../java/org/apache/zeppelin/flink/FlinkInterpreterTest.java | 4 ---- .../apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java | 10 ---------- .../org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java | 4 ---- 4 files changed, 1 insertion(+), 22 deletions(-) diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java index dd85272..60c5c5a 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java @@ -56,10 +56,7 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter { @Override public void callInnerSelect(String sql, InterpreterContext context) throws IOException { - String streamType = context.getLocalProperties().get("type"); - if (streamType == null) { - throw new IOException("type must be specified for stream sql"); - } + String streamType = context.getLocalProperties().getOrDefault("type", "update"); if (streamType.equalsIgnoreCase("single")) { SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob( flinkInterpreter.getStreamExecutionEnvironment(), diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index ceced31..8649138 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -309,7 +309,6 @@ public class FlinkInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); InterpreterResult result2 = interpreter.interpret( "val table = stenv.sqlQuery(\"select url, count(1) as pv from " + "log group by url\")\nz.show(table, streamType=\"update\")", context); @@ -330,7 +329,6 @@ public class FlinkInterpreterTest { Thread.sleep(20 * 1000); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); interpreter.cancel(context); waiter.await(10 * 1000); // resume job @@ -356,7 +354,6 @@ public class FlinkInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "1"); context.getLocalProperties().put("maxParallelism", "10"); @@ -380,7 +377,6 @@ public class FlinkInterpreterTest { Thread.sleep(20 * 1000); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "2"); context.getLocalProperties().put("maxParallelism", "10"); diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java index 382a9b9..98604cd 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java @@ -95,7 +95,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); result = sqlInterpreter.interpret("select url, count(1) as pv from " + "log group by url", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -171,7 +170,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " + "log group by url", context); waiter.assertTrue(context.out.toString().contains("Job was cancelled")); @@ -189,7 +187,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread.sleep(10 * 1000); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); sqlInterpreter.cancel(context); waiter.await(10 * 1000); // resume job @@ -215,7 +212,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "1"); context.getLocalProperties().put("maxParallelism", "10"); @@ -238,7 +234,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread.sleep(10 * 1000); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "2"); context.getLocalProperties().put("maxParallelism", "10"); @@ -267,7 +262,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "1"); context.getLocalProperties().put("maxParallelism", "10"); @@ -288,7 +282,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { Thread.sleep(10 * 1000); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "2"); context.getLocalProperties().put("maxParallelism", "10"); @@ -322,7 +315,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("parallelism", "1"); context.getLocalProperties().put("maxParallelism", "10"); context.getLocalProperties().put(JobManager.RESUME_FROM_SAVEPOINT, "true"); @@ -349,7 +341,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); InterpreterContext context = getInterpreterContext(); - context.getLocalProperties().put("type", "update"); result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " + "log group by url", context); assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); @@ -442,7 +433,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest { // runAsOne won't affect the select statement. context = getInterpreterContext(); context.getLocalProperties().put("runAsOne", "true"); - context.getLocalProperties().put("type", "update"); result = sqlInterpreter.interpret( "select 1", context); diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java index fb562cd..e1536bd 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java @@ -382,7 +382,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = createInterpreterContext(); - context.getLocalProperties().put("type", "update"); InterpreterResult result2 = interpreter.interpret( "table = st_env.sql_query('select url, count(1) as pv from " + "log group by url')\nz.show(table, stream_type='update')", context); @@ -402,7 +401,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { Thread.sleep(20 * 1000); InterpreterContext context = createInterpreterContext(); - context.getLocalProperties().put("type", "update"); interpreter.cancel(context); waiter.await(10 * 1000); // resume job @@ -426,7 +424,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { Thread thread = new Thread(() -> { try { InterpreterContext context = createInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "1"); context.getLocalProperties().put("maxParallelism", "10"); @@ -449,7 +446,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest { Thread.sleep(20 * 1000); InterpreterContext context = createInterpreterContext(); - context.getLocalProperties().put("type", "update"); context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath()); context.getLocalProperties().put("parallelism", "2"); context.getLocalProperties().put("maxParallelism", "10");