This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new ce748cfa9f [ZEPPELIN-5697] Remove old planner support of flink (#4345)
ce748cfa9f is described below

commit ce748cfa9f8dcadb3cc80cafe69b0b4b531d9eb5
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Apr 7 20:58:12 2022 +0800

    [ZEPPELIN-5697] Remove old planner support of flink (#4345)
---
 docs/interpreter/flink.md                          | 20 ++++-----------
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |  2 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  6 ++---
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |  4 +--
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  4 +--
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  4 +--
 .../src/main/resources/python/zeppelin_ipyflink.py |  6 ++---
 .../src/main/resources/python/zeppelin_pyflink.py  |  6 ++---
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 29 +++++-----------------
 9 files changed, 25 insertions(+), 56 deletions(-)

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 029394ce4e..fa86d8308e 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -385,21 +385,13 @@ Here are the builtin variables created in Flink Scala 
shell.
 
 * senv (StreamExecutionEnvironment),
 * benv (ExecutionEnvironment)
-* stenv (StreamTableEnvironment for blink planner)
-* btenv (BatchTableEnvironment for blink planner)
-* stenv_2 (StreamTableEnvironment for flink planner)
-* btenv_2 (BatchTableEnvironment for flink planner)
+* stenv (StreamTableEnvironment for blink planner (aka. new planner))
+* btenv (BatchTableEnvironment for blink planner (aka. new planner))
 * z  (ZeppelinContext)
 
 ### Blink/Flink Planner
 
-There are 2 planners supported by Flink SQL: `flink` & `blink`.
-
-* If you want to use DataSet api, and convert it to Flink table then please 
use `flink` planner (`btenv_2` and `stenv_2`).
-* In other cases, we would always recommend you to use `blink` planner. This 
is also what Flink batch/streaming sql interpreter use (`%flink.bsql` & 
`%flink.ssql`)
-
-Check this 
[page](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners)
 for the difference between flink planner and blink planner.
-
+After Zeppelin 0.11, we remove the support of flink planner (aka. old planner) 
which is also removed after Flink 1.14.
 
 ### Stream WordCount Example
 
@@ -537,10 +529,8 @@ These are variables created in Python shell.
 
 * `s_env`    (StreamExecutionEnvironment),
 * `b_env`     (ExecutionEnvironment)
-* `st_env`   (StreamTableEnvironment for blink planner)
-* `bt_env`   (BatchTableEnvironment for blink planner)
-* `st_env_2`   (StreamTableEnvironment for flink planner)
-* `bt_env_2`   (BatchTableEnvironment for flink planner)
+* `st_env`   (StreamTableEnvironment for blink planner (aka. new planner))
+* `bt_env`   (BatchTableEnvironment for blink planner (aka. new planner))
 
 
 ### Configure PyFlink
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index dea35bfbe1..f720ff255d 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -38,7 +38,7 @@ public class FlinkBatchSqlInterpreter extends 
FlinkSqlInterpreter {
             flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
             flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
             flinkInterpreter.getJavaBatchTableEnvironment("blink"),
-            flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+            flinkInterpreter.getJavaStreamTableEnvironment(),
             flinkInterpreter.getZeppelinContext(),
             null);
     
flinkInterpreter.getFlinkShims().initInnerBatchSqlInterpreter(flinkSqlContext);
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 7e047ccae0..63c69a004d 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -155,15 +155,15 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   TableEnvironment getStreamTableEnvironment() {
-    return this.innerIntp.getStreamTableEnvironment("blink");
+    return this.innerIntp.getStreamTableEnvironment();
   }
 
   org.apache.flink.table.api.TableEnvironment 
getJavaBatchTableEnvironment(String planner) {
     return this.innerIntp.getJavaBatchTableEnvironment(planner);
   }
 
-  TableEnvironment getJavaStreamTableEnvironment(String planner) {
-    return this.innerIntp.getJavaStreamTableEnvironment(planner);
+  TableEnvironment getJavaStreamTableEnvironment() {
+    return this.innerIntp.getJavaStreamTableEnvironment();
   }
 
   TableEnvironment getBatchTableEnvironment() {
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index 6bffb23658..087fa3a208 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -43,7 +43,7 @@ public class FlinkStreamSqlInterpreter extends 
FlinkSqlInterpreter {
             flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
             flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
             flinkInterpreter.getJavaBatchTableEnvironment("blink"),
-            flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+            flinkInterpreter.getJavaStreamTableEnvironment(),
             flinkInterpreter.getZeppelinContext(),
             sql -> callInnerSelect(sql));
 
@@ -56,7 +56,7 @@ public class FlinkStreamSqlInterpreter extends 
FlinkSqlInterpreter {
     if (streamType.equalsIgnoreCase("single")) {
       SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
               flinkInterpreter.getStreamExecutionEnvironment(),
-              flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+              flinkInterpreter.getJavaStreamTableEnvironment(),
               flinkInterpreter.getJobManager(),
               context,
               flinkInterpreter.getDefaultParallelism(),
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 12bda20383..817d13fa3e 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -150,7 +150,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter 
{
     return flinkInterpreter.getJavaBatchTableEnvironment(planner);
   }
 
-  public TableEnvironment getJavaStreamTableEnvironment(String planner) {
-    return flinkInterpreter.getJavaStreamTableEnvironment(planner);
+  public TableEnvironment getJavaStreamTableEnvironment() {
+    return flinkInterpreter.getJavaStreamTableEnvironment();
   }
 }
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 8c38d790f4..3a33cd7c4b 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -199,7 +199,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
     return flinkInterpreter.getJavaBatchTableEnvironment(planner);
   }
 
-  public TableEnvironment getJavaStreamTableEnvironment(String planner) {
-    return flinkInterpreter.getJavaStreamTableEnvironment(planner);
+  public TableEnvironment getJavaStreamTableEnvironment() {
+    return flinkInterpreter.getJavaStreamTableEnvironment();
   }
 }
diff --git 
a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py 
b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
index 367b318660..62cc81bef8 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
@@ -50,11 +50,9 @@ if not intp.isAfterFlink114():
     from pyflink.dataset import *
     b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
     bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
-    st_env = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
-    bt_env_2 = 
BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
-    st_env_2 = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+    st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment())
 else:
-    st_env = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+    st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment())
 
 class IPyFlinkZeppelinContext(PyZeppelinContext):
 
diff --git 
a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py 
b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
index 173c3b5c85..2970c6d265 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
@@ -39,11 +39,9 @@ if not intp.isAfterFlink114():
   from pyflink.dataset import *
   b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
   bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
-  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
-  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"))
-  st_env_2 = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"))
+  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment())
 else:
-  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"))
+  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment())
 
 
 from zeppelin_context import PyZeppelinContext
diff --git 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index d98d7e1b7b..1e9b1dcabb 100644
--- 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -86,9 +86,8 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
   private var btenv: TableEnvironment = _
   private var stenv: TableEnvironment = _
 
-  // TableEnvironment of flink planner
+  // TableEnvironment of flink planner (used for convert Flink table to 
DataSet)
   private var btenv_2: TableEnvironment = _
-  private var stenv_2: TableEnvironment = _
 
   // PyFlink depends on java version of TableEnvironment,
   // so need to create java version of TableEnvironment
@@ -96,9 +95,8 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
   private var java_btenv: TableEnvironment = _
   private var java_stenv: TableEnvironment = _
 
-  // java version of flink TableEnvironment
+  // java version TableEnvironment of old planner, used for converting Table 
to DataSet
   private var java_btenv_2: TableEnvironment = _
-  private var java_stenv_2: TableEnvironment = _
 
   private var z: FlinkZeppelinContext = _
   private var flinkVersion: FlinkVersion = _
@@ -448,15 +446,7 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
       if (!flinkVersion.isAfterFlink114()) {
         // flink planner is not supported after flink 1.14
         this.btenv_2 = tblEnvFactory.createScalaFlinkBatchTableEnvironment()
-        flinkILoop.bind("btenv_2", btenv_2.getClass().getCanonicalName(), 
btenv_2, List("@transient"))
-        stEnvSetting =
-          
EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build()
-        this.stenv_2 = 
tblEnvFactory.createScalaFlinkStreamTableEnvironment(stEnvSetting, 
getFlinkClassLoader)
-        flinkILoop.bind("stenv_2", stenv_2.getClass().getCanonicalName(), 
stenv_2, List("@transient"))
-
         this.java_btenv_2 = 
tblEnvFactory.createJavaFlinkBatchTableEnvironment()
-        btEnvSetting = 
EnvironmentSettings.newInstance.useOldPlanner.inStreamingMode.build
-        this.java_stenv_2 = 
tblEnvFactory.createJavaFlinkStreamTableEnvironment(btEnvSetting, 
getFlinkClassLoader)
       }
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader)
@@ -768,11 +758,8 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
       this.btenv_2
   }
 
-  def getStreamTableEnvironment(planner: String = "blink"): TableEnvironment = 
{
-    if (planner == "blink")
-      this.stenv
-    else
-      this.stenv_2
+  def getStreamTableEnvironment(): TableEnvironment = {
+    this.stenv
   }
 
   def getJavaBatchTableEnvironment(planner: String): TableEnvironment = {
@@ -783,12 +770,8 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
     }
   }
 
-  def getJavaStreamTableEnvironment(planner: String): TableEnvironment = {
-    if (planner == "blink") {
-      this.java_stenv
-    } else {
-      this.java_stenv_2
-    }
+  def getJavaStreamTableEnvironment(): TableEnvironment = {
+    this.java_stenv
   }
 
   def getDefaultParallelism = this.defaultParallelism

Reply via email to