This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new ce748cfa9f [ZEPPELIN-5697] Remove old planner support of flink (#4345) ce748cfa9f is described below commit ce748cfa9f8dcadb3cc80cafe69b0b4b531d9eb5 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Apr 7 20:58:12 2022 +0800 [ZEPPELIN-5697] Remove old planner support of flink (#4345) --- docs/interpreter/flink.md | 20 ++++----------- .../zeppelin/flink/FlinkBatchSqlInterpreter.java | 2 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 6 ++--- .../zeppelin/flink/FlinkStreamSqlInterpreter.java | 4 +-- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 +-- .../apache/zeppelin/flink/PyFlinkInterpreter.java | 4 +-- .../src/main/resources/python/zeppelin_ipyflink.py | 6 ++--- .../src/main/resources/python/zeppelin_pyflink.py | 6 ++--- .../zeppelin/flink/FlinkScalaInterpreter.scala | 29 +++++----------------- 9 files changed, 25 insertions(+), 56 deletions(-) diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index 029394ce4e..fa86d8308e 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -385,21 +385,13 @@ Here are the builtin variables created in Flink Scala shell. * senv (StreamExecutionEnvironment), * benv (ExecutionEnvironment) -* stenv (StreamTableEnvironment for blink planner) -* btenv (BatchTableEnvironment for blink planner) -* stenv_2 (StreamTableEnvironment for flink planner) -* btenv_2 (BatchTableEnvironment for flink planner) +* stenv (StreamTableEnvironment for blink planner (aka. new planner)) +* btenv (BatchTableEnvironment for blink planner (aka. new planner)) * z (ZeppelinContext) ### Blink/Flink Planner -There are 2 planners supported by Flink SQL: `flink` & `blink`. - -* If you want to use DataSet api, and convert it to Flink table then please use `flink` planner (`btenv_2` and `stenv_2`). -* In other cases, we would always recommend you to use `blink` planner. This is also what Flink batch/streaming sql interpreter use (`%flink.bsql` & `%flink.ssql`) - -Check this [page](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners) for the difference between flink planner and blink planner. - +After Zeppelin 0.11, we remove the support of flink planner (aka. old planner) which is also removed after Flink 1.14. ### Stream WordCount Example @@ -537,10 +529,8 @@ These are variables created in Python shell. * `s_env` (StreamExecutionEnvironment), * `b_env` (ExecutionEnvironment) -* `st_env` (StreamTableEnvironment for blink planner) -* `bt_env` (BatchTableEnvironment for blink planner) -* `st_env_2` (StreamTableEnvironment for flink planner) -* `bt_env_2` (BatchTableEnvironment for flink planner) +* `st_env` (StreamTableEnvironment for blink planner (aka. new planner)) +* `bt_env` (BatchTableEnvironment for blink planner (aka. new planner)) ### Configure PyFlink diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java index dea35bfbe1..f720ff255d 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java @@ -38,7 +38,7 @@ public class FlinkBatchSqlInterpreter extends FlinkSqlInterpreter { flinkInterpreter.getExecutionEnvironment().getJavaEnv(), flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(), flinkInterpreter.getJavaBatchTableEnvironment("blink"), - flinkInterpreter.getJavaStreamTableEnvironment("blink"), + flinkInterpreter.getJavaStreamTableEnvironment(), flinkInterpreter.getZeppelinContext(), null); flinkInterpreter.getFlinkShims().initInnerBatchSqlInterpreter(flinkSqlContext); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 7e047ccae0..63c69a004d 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -155,15 +155,15 @@ public class FlinkInterpreter extends Interpreter { } TableEnvironment getStreamTableEnvironment() { - return this.innerIntp.getStreamTableEnvironment("blink"); + return this.innerIntp.getStreamTableEnvironment(); } org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment(String planner) { return this.innerIntp.getJavaBatchTableEnvironment(planner); } - TableEnvironment getJavaStreamTableEnvironment(String planner) { - return this.innerIntp.getJavaStreamTableEnvironment(planner); + TableEnvironment getJavaStreamTableEnvironment() { + return this.innerIntp.getJavaStreamTableEnvironment(); } TableEnvironment getBatchTableEnvironment() { diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java index 6bffb23658..087fa3a208 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java @@ -43,7 +43,7 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter { flinkInterpreter.getExecutionEnvironment().getJavaEnv(), flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(), flinkInterpreter.getJavaBatchTableEnvironment("blink"), - flinkInterpreter.getJavaStreamTableEnvironment("blink"), + flinkInterpreter.getJavaStreamTableEnvironment(), flinkInterpreter.getZeppelinContext(), sql -> callInnerSelect(sql)); @@ -56,7 +56,7 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter { if (streamType.equalsIgnoreCase("single")) { SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob( flinkInterpreter.getStreamExecutionEnvironment(), - flinkInterpreter.getJavaStreamTableEnvironment("blink"), + flinkInterpreter.getJavaStreamTableEnvironment(), flinkInterpreter.getJobManager(), context, flinkInterpreter.getDefaultParallelism(), diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 12bda20383..817d13fa3e 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -150,7 +150,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { return flinkInterpreter.getJavaBatchTableEnvironment(planner); } - public TableEnvironment getJavaStreamTableEnvironment(String planner) { - return flinkInterpreter.getJavaStreamTableEnvironment(planner); + public TableEnvironment getJavaStreamTableEnvironment() { + return flinkInterpreter.getJavaStreamTableEnvironment(); } } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java index 8c38d790f4..3a33cd7c4b 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java @@ -199,7 +199,7 @@ public class PyFlinkInterpreter extends PythonInterpreter { return flinkInterpreter.getJavaBatchTableEnvironment(planner); } - public TableEnvironment getJavaStreamTableEnvironment(String planner) { - return flinkInterpreter.getJavaStreamTableEnvironment(planner); + public TableEnvironment getJavaStreamTableEnvironment() { + return flinkInterpreter.getJavaStreamTableEnvironment(); } } diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py index 367b318660..62cc81bef8 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py @@ -50,11 +50,9 @@ if not intp.isAfterFlink114(): from pyflink.dataset import * b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) - st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment()) else: - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment()) class IPyFlinkZeppelinContext(PyZeppelinContext): diff --git a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py index 173c3b5c85..2970c6d265 100644 --- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py +++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py @@ -39,11 +39,9 @@ if not intp.isAfterFlink114(): from pyflink.dataset import * b_env = pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment()) bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink")) - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) - bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink")) - st_env_2 = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment()) else: - st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink")) + st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment()) from zeppelin_context import PyZeppelinContext diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index d98d7e1b7b..1e9b1dcabb 100644 --- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -86,9 +86,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, private var btenv: TableEnvironment = _ private var stenv: TableEnvironment = _ - // TableEnvironment of flink planner + // TableEnvironment of flink planner (used for convert Flink table to DataSet) private var btenv_2: TableEnvironment = _ - private var stenv_2: TableEnvironment = _ // PyFlink depends on java version of TableEnvironment, // so need to create java version of TableEnvironment @@ -96,9 +95,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, private var java_btenv: TableEnvironment = _ private var java_stenv: TableEnvironment = _ - // java version of flink TableEnvironment + // java version TableEnvironment of old planner, used for converting Table to DataSet private var java_btenv_2: TableEnvironment = _ - private var java_stenv_2: TableEnvironment = _ private var z: FlinkZeppelinContext = _ private var flinkVersion: FlinkVersion = _ @@ -448,15 +446,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties, if (!flinkVersion.isAfterFlink114()) { // flink planner is not supported after flink 1.14 this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment() - flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), btenv_2, List("@transient")) - stEnvSetting = - EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build() - this.stenv_2 = tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, getFlinkClassLoader) - flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), stenv_2, List("@transient")) - this.java_btenv_2 = tblEnvFactory.createJavaFlinkBatchTableEnvironment() - btEnvSetting = EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build - this.java_stenv_2 = tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, getFlinkClassLoader) } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader) @@ -768,11 +758,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, this.btenv_2 } - def getStreamTableEnvironment(planner: String = "blink"): TableEnvironment = { - if (planner == "blink") - this.stenv - else - this.stenv_2 + def getStreamTableEnvironment(): TableEnvironment = { + this.stenv } def getJavaBatchTableEnvironment(planner: String): TableEnvironment = { @@ -783,12 +770,8 @@ abstract class FlinkScalaInterpreter(val properties: Properties, } } - def getJavaStreamTableEnvironment(planner: String): TableEnvironment = { - if (planner == "blink") { - this.java_stenv - } else { - this.java_stenv_2 - } + def getJavaStreamTableEnvironment(): TableEnvironment = { + this.java_stenv } def getDefaultParallelism = this.defaultParallelism