This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e6c123  [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11
7e6c123 is described below

commit 7e6c1236ff84acd636325763eb86017e8a412d9c
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Oct 18 11:15:09 2021 +0800

    [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11
    
    ### What is this PR for?
    
    * Remove module `flink1.10-shims` & `flink1.11-shims`
    * Related tests are also removed
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5553
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #4251 from zjffdu/ZEPPELIN-5553 and squashes the following commits:
    
    1160dc675f [Jeff Zhang] update
    929df448f9 [Jeff Zhang] [ZEPPELIN-5553] Remove support of flink 1.10 & 1.11
---
 .github/workflows/core.yml                         |   6 +-
 flink/README.md                                    |   3 +-
 flink/flink-scala-parent/pom.xml                   |  92 +---
 .../apache/zeppelin/flink/FlinkSqlInterpreter.java |  12 +-
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |   4 -
 .../java/org/apache/zeppelin/flink/JobManager.java |   1 +
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |   4 -
 .../org/apache/zeppelin/flink/TableEnvFactory.java |  58 +--
 .../src/main/resources/python/zeppelin_ipyflink.py |   9 +-
 .../src/main/resources/python/zeppelin_pyflink.py  |   9 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  16 +-
 .../zeppelin/flink/internal/FlinkShell.scala       |   3 -
 .../flink/FlinkBatchSqlInterpreterTest.java        |   4 -
 .../apache/zeppelin/flink/SqlInterpreterTest.java  | 212 ++++-----
 .../java/org/apache/zeppelin/flink/FlinkShims.java |   8 +-
 .../org/apache/zeppelin/flink/FlinkVersion.java    |   4 -
 flink/flink1.10-shims/pom.xml                      | 217 ---------
 .../org/apache/zeppelin/flink/Flink110Shims.java   | 389 ---------------
 .../flink/shims110/CollectStreamTableSink.java     | 102 ----
 .../flink/shims110/Flink110ScalaShims.scala        |  37 --
 flink/flink1.11-shims/pom.xml                      | 211 ---------
 .../org/apache/zeppelin/flink/Flink111Shims.java   | 526 ---------------------
 .../flink/shims111/CollectStreamTableSink.java     |  97 ----
 .../flink/shims111/Flink111ScalaShims.scala        |  36 --
 flink/pom.xml                                      |   4 -
 .../integration/FlinkIntegrationTest110.java       |  40 --
 .../integration/FlinkIntegrationTest111.java       |  40 --
 .../integration/ZSessionIntegrationTest.java       |   2 +-
 .../integration/ZeppelinFlinkClusterTest.java      |  15 +-
 .../integration/ZeppelinFlinkClusterTest112.java   |   8 +-
 ...st111.java => ZeppelinFlinkClusterTest113.java} |   9 +-
 ...st110.java => ZeppelinFlinkClusterTest114.java} |   9 +-
 .../src/test/resources/init_stream.scala           |   2 +-
 .../interpreter/RemoteInterpreterEventServer.java  |   1 +
 34 files changed, 140 insertions(+), 2050 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 1609ad1..866ed31 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -143,12 +143,13 @@ jobs:
           ./mvnw package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
         run: ./mvnw test -DskipRat -pl zeppelin-interpreter-integration 
-Pintegration -DfailIfNoTests=false 
-Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
+
   test-flink-and-flink-integration-test:
     runs-on: ubuntu-20.04
     strategy:
       fail-fast: false
       matrix:
-        flink: [110, 111, 112, 113, 114]
+        flink: [112, 113, 114]
     steps:
       - name: Checkout
         uses: actions/checkout@v2
@@ -180,7 +181,8 @@ jobs:
           ./mvnw install -DskipTests -DskipRat -am -pl 
flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration
 -Pflink-${{ matrix.flink }} -Pintegration -B
           ./mvnw clean package -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
-        run: ./mvnw test -DskipRat -pl 
flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration
 -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B 
-Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink 
}},ZeppelinFlinkClusterTest${{ matrix.flink }}
+        run: ./mvnw test -DskipRat -pl 
flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration
 -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B 
-Dtest=ZeppelinFlinkClusterTest${{ matrix.flink }}
+
   run-spark-intergration-test:
     runs-on: ubuntu-20.04
     steps:
diff --git a/flink/README.md b/flink/README.md
index e8e7dd9..53618ad 100644
--- a/flink/README.md
+++ b/flink/README.md
@@ -8,10 +8,9 @@ This is the doc for Zeppelin developers who want to work on 
flink interpreter.
 
 Flink interpreter is more complex than other interpreter (such as jdbc, 
shell). Currently it has following 8 modules
 * flink-shims
-* flink1.10-shims
-* flink1.11-shims
 * flink1.12-shims
 * flink1.13-shims
+* flink1.14-shims
 * flink-scala-parent
 * flink-scala-2.11
 * flink-scala-2.12
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index b51ed70..83a7ec7 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -36,7 +36,7 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>${flink1.10.version}</flink.version>
+    <flink.version>${flink1.12.version}</flink.version>
     <flink.hadoop.version>2.6.5</flink.hadoop.version>
     <hive.version>2.3.4</hive.version>
     <hiverunner.version>4.0.0</hiverunner.version>
@@ -56,18 +56,6 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
-      <artifactId>flink1.10-shims</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>flink1.11-shims</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
       <artifactId>flink1.12-shims</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -931,84 +919,6 @@
   <profiles>
 
     <profile>
-      <id>flink-110</id>
-      <properties>
-        <flink.version>${flink1.10.version}</flink.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.reflections</groupId>
-              <artifactId>reflections</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>flink-111</id>
-      <properties>
-        <flink.version>${flink1.11.version}</flink.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.reflections</groupId>
-              <artifactId>reflections</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
       <id>flink-112</id>
       <properties>
         <flink.version>${flink1.12.version}</flink.version>
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
index 97abfdc..c3ec4cd 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
@@ -341,11 +341,7 @@ public abstract class FlinkSqlInterpreter extends 
AbstractInterpreter {
   private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, 
InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      if (flinkInterpreter.getFlinkVersion().isFlink110()) {
-        this.tbenv.dropTemporaryView(sqlCommand.operands[0]);
-      } else {
-        flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
-      }
+      flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
@@ -357,11 +353,7 @@ public abstract class FlinkSqlInterpreter extends 
AbstractInterpreter {
   private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, 
InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      if (flinkInterpreter.getFlinkVersion().isFlink110()) {
-        this.tbenv.createTemporaryView(sqlCommand.operands[0], 
tbenv.sqlQuery(sqlCommand.operands[1]));
-      } else {
-        flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
-      }
+      flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 25ee255..12bda20 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -137,10 +137,6 @@ public class IPyFlinkInterpreter extends 
IPythonInterpreter {
     return flinkInterpreter.getProgress(context);
   }
 
-  public boolean isFlink110() {
-    return flinkInterpreter.getFlinkVersion().isFlink110();
-  }
-
   public org.apache.flink.api.java.ExecutionEnvironment 
getJavaExecutionEnvironment() {
     return flinkInterpreter.getExecutionEnvironment().getJavaEnv();
   }
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 49c803a..c957fa7 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -268,6 +268,7 @@ public class JobManager {
                 if (!StringUtils.isBlank(checkpointPath) && 
!checkpointPath.equals(latestCheckpointPath)) {
                   Map<String, String> config = new HashMap<>();
                   config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
+                  LOGGER.info("Update latest checkpoint path: {}", 
checkpointPath);
                   context.getIntpEventClient().updateParagraphConfig(
                           context.getNoteId(), context.getParagraphId(), 
config);
                   latestCheckpointPath = checkpointPath;
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 4162cc3..8c38d79 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -182,10 +182,6 @@ public class PyFlinkInterpreter extends PythonInterpreter {
     return flinkInterpreter.getProgress(context);
   }
 
-  public boolean isFlink110() {
-    return flinkInterpreter.getFlinkVersion().isFlink110();
-  }
-
   public boolean isAfterFlink114() {
     return flinkInterpreter.getFlinkVersion().isAfterFlink114();
   }
diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 6c080fd..1456a9f 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -104,14 +104,9 @@ public class TableEnvFactory {
 
   public TableEnvironment createScalaFlinkBatchTableEnvironment() {
     try {
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.scala.internal.BatchTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl");
-      }
+
       Constructor constructor = clazz
               .getConstructor(
                       org.apache.flink.api.scala.ExecutionEnvironment.class,
@@ -134,14 +129,8 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
-      }
 
       try {
         Constructor constructor = clazz
@@ -195,14 +184,8 @@ public class TableEnvFactory {
 
   public TableEnvironment createJavaFlinkBatchTableEnvironment() {
     try {
-      Class<?> clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class<?> clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
-      }
 
       Constructor con = clazz.getConstructor(
               ExecutionEnvironment.class,
@@ -229,14 +212,8 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-      }
 
       try {
         Constructor constructor = clazz
@@ -297,14 +274,8 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
-      }
       try {
         Constructor constructor = clazz
                 .getConstructor(
@@ -360,14 +331,9 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-      }
+
       try {
         Constructor constructor = clazz
                 .getConstructor(
@@ -424,14 +390,8 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = null;
-      if (flinkVersion.isFlink110()) {
-        clazz = Class
-                
.forName("org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl");
-      } else {
-        clazz = Class
+      Class clazz = Class
                 
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
-      }
       try {
         Constructor constructor = clazz.getConstructor(
                 CatalogManager.class,
diff --git 
a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py 
b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
index 9249453..367b318 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_ipyflink.py
@@ -46,14 +46,7 @@ pyflink.java_gateway.install_exception_handler()
 
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
 
-if intp.isFlink110():
-    from pyflink.dataset import *
-    b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
-    bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), 
True)
-    bt_env_2 = 
BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), False)
-    st_env = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), True)
-    st_env_2 = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-elif not intp.isAfterFlink114():
+if not intp.isAfterFlink114():
     from pyflink.dataset import *
     b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
     bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
diff --git 
a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py 
b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
index 06b99c9..173c3b5 100644
--- a/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/flink-scala-parent/src/main/resources/python/zeppelin_pyflink.py
@@ -35,14 +35,7 @@ pyflink.java_gateway.install_exception_handler()
 
 s_env = StreamExecutionEnvironment(intp.getJavaStreamExecutionEnvironment())
 
-if intp.isFlink110():
-  from pyflink.dataset import *
-  b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
-  bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"), 
True)
-  bt_env_2 = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("flink"), 
False)
-  st_env = StreamTableEnvironment(intp.getJavaStreamTableEnvironment("blink"), 
True)
-  st_env_2 = 
StreamTableEnvironment(intp.getJavaStreamTableEnvironment("flink"), False)
-elif not intp.isAfterFlink114():
+if not intp.isAfterFlink114():
   from pyflink.dataset import *
   b_env = 
pyflink.dataset.ExecutionEnvironment(intp.getJavaExecutionEnvironment())
   bt_env = BatchTableEnvironment(intp.getJavaBatchTableEnvironment("blink"))
diff --git 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 12051dd..bab5327 100644
--- 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -187,9 +187,6 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
         .replace("-", "_")
         .toUpperCase)
     if (ExecutionMode.isYarnAppicationMode(mode)) {
-      if (flinkVersion.isFlink110) {
-        throw new Exception("yarn-application mode is only supported after 
Flink 1.11")
-      }
       // use current yarn container working directory as FLINK_HOME, 
FLINK_CONF_DIR and HIVE_CONF_DIR
       val workingDirectory = new File(".").getAbsolutePath
       flinkHome = workingDirectory
@@ -197,9 +194,6 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
       hiveConfDir = workingDirectory
     }
     if (ExecutionMode.isK8sApplicationMode(mode)) {
-      if (flinkVersion.isFlink110) {
-        throw new Exception("application mode is only supported after Flink 
1.11")
-      }
       // use current pod working directory as FLINK_HOME
       val workingDirectory = new File(".").getAbsolutePath
       flinkHome = workingDirectory
@@ -417,14 +411,8 @@ abstract class FlinkScalaInterpreter(val properties: 
Properties,
       )
 
       flinkILoop.intp.interpret("import " + packageImports.mkString(", "))
-
-      if (flinkVersion.isFlink110) {
-        flinkILoop.intp.interpret("import org.apache.flink.table.api.scala._")
-      } else {
-        flinkILoop.intp.interpret("import org.apache.flink.table.api._")
-        flinkILoop.intp.interpret("import 
org.apache.flink.table.api.bridge.scala._")
-      }
-
+      flinkILoop.intp.interpret("import org.apache.flink.table.api._")
+      flinkILoop.intp.interpret("import 
org.apache.flink.table.api.bridge.scala._")
       flinkILoop.intp.interpret("import 
org.apache.flink.table.functions.ScalarFunction")
       flinkILoop.intp.interpret("import 
org.apache.flink.table.functions.AggregateFunction")
       flinkILoop.intp.interpret("import 
org.apache.flink.table.functions.TableFunction")
diff --git 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
index ab0f299..ae6d411 100644
--- 
a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
+++ 
b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
@@ -112,9 +112,6 @@ object FlinkShell {
       case None => (flinkConfig, None)
     }
 
-    // workaround for FLINK-17788, otherwise it won't work with flink 1.10.1 
which has been released.
-    flinkConfig.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME)
-
     val (effectiveConfig, _) = clusterClient match {
       case Some(_) => fetchDeployedYarnClusterInfo(config, clusterConfig, 
"yarn-cluster", flinkShims)
       case None => fetchDeployedYarnClusterInfo(config, clusterConfig, 
"default", flinkShims)
diff --git 
a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
 
b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 875756f..e3526eb 100644
--- 
a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ 
b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -406,10 +406,6 @@ public class FlinkBatchSqlInterpreterTest extends 
SqlInterpreterTest {
 
   @Test
   public void testFunctionHintRowType() throws InterpreterException, 
IOException {
-    if (flinkInterpreter.getFlinkVersion().isFlink110()) {
-      // Row Type hint is not supported in flink 1.10
-      return;
-    }
     // define table function with TableHint of Row return type
     InterpreterContext context = getInterpreterContext();
     InterpreterResult result = flinkInterpreter.interpret(
diff --git 
a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
 
b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index e71ff93..12862d2 100644
--- 
a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ 
b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -368,15 +368,13 @@ public abstract class SqlInterpreterTest {
     assertEquals("table\nsource_table\n", resultMessages.get(0).getData());
 
     // create temporary view
-    if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret("create temporary view my_temp_view as 
select int_col from source_table", context);
-      assertEquals(result.toString(), Code.SUCCESS, result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      assertEquals(1, resultMessages.size());
-      assertEquals(Type.TEXT, resultMessages.get(0).getType());
-      assertEquals("View has been created.\n", 
resultMessages.get(0).getData());
-    }
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("create temporary view my_temp_view as 
select int_col from source_table", context);
+    assertEquals(result.toString(), Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(Type.TEXT, resultMessages.get(0).getType());
+    assertEquals("View has been created.\n", resultMessages.get(0).getData());
   }
 
   @Test
@@ -432,133 +430,107 @@ public abstract class SqlInterpreterTest {
 
   @Test
   public void testFunction() throws IOException, InterpreterException {
+    InterpreterContext context = getInterpreterContext();
 
-    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
-    if(!flinkVersion.isFlink110()){
-      InterpreterContext context = getInterpreterContext();
-
-      // CREATE UDF
-      InterpreterResult result = sqlInterpreter.interpret(
-              "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' 
;", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been created."));
-
-      // SHOW UDF
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret(
-              "SHOW FUNCTIONS ;", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf"));
+    // CREATE UDF
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' 
;", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been created."));
 
+    // SHOW UDF
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "SHOW FUNCTIONS ;", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf"));
 
-      // ALTER
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret(
-              "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; 
", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been modified."));
 
+    // ALTER
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; 
", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been modified."));
 
-      // DROP UDF
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been dropped."));
 
+    // DROP UDF
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been dropped."));
 
-      // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf'
-      result = sqlInterpreter.interpret(
-              "SHOW FUNCTIONS ;", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      assertFalse(resultMessages.toString(), 
resultMessages.get(0).getData().contains("myudf"));
-    } else {
-      // Flink1.10 don't support ddl for function
-      assertTrue(flinkVersion.isFlink110());
-    }
 
+    // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf'
+    result = sqlInterpreter.interpret(
+            "SHOW FUNCTIONS ;", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertFalse(resultMessages.toString(), 
resultMessages.get(0).getData().contains("myudf"));
   }
 
   @Test
   public void testCatalog() throws IOException, InterpreterException{
-    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
-
-    if (!flinkVersion.isFlink110()){
-      InterpreterContext context = getInterpreterContext();
+    InterpreterContext context = getInterpreterContext();
 
-      // CREATE CATALOG
-      InterpreterResult result = sqlInterpreter.interpret(
-              "CREATE CATALOG test_catalog \n" +
-                      "WITH( \n" +
-                      "'type'='generic_in_memory' \n" +
-                      ");", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been created."));
+    // CREATE CATALOG
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE CATALOG test_catalog \n" +
+                    "WITH( \n" +
+                    "'type'='generic_in_memory' \n" +
+                    ");", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been created."));
 
-      // USE CATALOG & SHOW DATABASES;
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret(
-              "USE CATALOG test_catalog ;\n" +
-                      "SHOW DATABASES;", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default"));
+    // USE CATALOG & SHOW DATABASES;
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "USE CATALOG test_catalog ;\n" +
+                    "SHOW DATABASES;", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default"));
 
-      // DROP CATALOG
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret(
-              "DROP CATALOG test_catalog ;\n", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been dropped."));
+    // DROP CATALOG
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "DROP CATALOG test_catalog ;\n", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been dropped."));
 
-      // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 
'test_catalog'
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret(
-              "SHOW CATALOGS ;\n", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      resultMessages = context.out.toInterpreterResultMessage();
-      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog"));
-      
assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog"));
-    } else {
-      // Flink1.10 don't support ddl for catalog
-      assertTrue(flinkVersion.isFlink110());
-    }
+    // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 
'test_catalog'
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "SHOW CATALOGS ;\n", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog"));
+    
assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog"));
 
   }
 
   @Test
   public void testSetProperty() throws InterpreterException {
     FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = sqlInterpreter.interpret(
+            "set table.sql-dialect=hive", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
 
-    if (!flinkVersion.isFlink110()){
-      InterpreterContext context = getInterpreterContext();
-      InterpreterResult result = sqlInterpreter.interpret(
-              "set table.sql-dialect=hive", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-
-      sqlInterpreter.interpret("create table test_hive_table(a string, b 
int)\n" +
-              "partitioned by (dt string)", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-    } else {
-      // Flink1.10 doesn't support set table.sql-dialet which is introduced in 
flink 1.11
-      InterpreterContext context = getInterpreterContext();
-      InterpreterResult result = sqlInterpreter.interpret(
-              "set table.sql-dialect=hive", context);
-      assertEquals(context.out.toString(), Code.ERROR, result.code());
-      assertTrue(context.out.toString(),
-              context.out.toString().contains("table.sql-dialect is not a 
valid table/sql config"));
-    }
+    sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" 
+
+            "partitioned by (dt string)", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
 
     // table.local-time-zone is only available from 1.12
     if 
(flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.12.0"))) {
-      InterpreterContext context = getInterpreterContext();
-      InterpreterResult result = sqlInterpreter.interpret("SET 
'table.local-time-zone' = 'UTC'", context);
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", 
context);
       assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
     }
   }
@@ -566,20 +538,14 @@ public abstract class SqlInterpreterTest {
   @Test
   public void testShowModules() throws InterpreterException, IOException {
     FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+    InterpreterContext context = getInterpreterContext();
 
-    if (!flinkVersion.isFlink110()) {
-      InterpreterContext context = getInterpreterContext();
-
-      // CREATE CATALOG
-      InterpreterResult result = sqlInterpreter.interpret(
-              "show modules", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
-      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
-      assertTrue(resultMessages.toString(), 
resultMessages.get(0).getData().contains("core"));
-    } else {
-      // Flink1.10 don't support show modules
-      assertTrue(flinkVersion.isFlink110());
-    }
+    // CREATE CATALOG
+    InterpreterResult result = sqlInterpreter.interpret(
+            "show modules", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    assertTrue(resultMessages.toString(), 
resultMessages.get(0).getData().contains("core"));
   }
 
 
diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index ba25ec9..640eba6 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -56,13 +56,7 @@ public abstract class FlinkShims {
   private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties 
properties)
       throws Exception {
     Class<?> flinkShimsClass;
-    if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() 
== 10) {
-      LOGGER.info("Initializing shims for Flink 1.10");
-      flinkShimsClass = 
Class.forName("org.apache.zeppelin.flink.Flink110Shims");
-    } else if (flinkVersion.getMajorVersion() == 1 && 
flinkVersion.getMinorVersion() == 11) {
-      LOGGER.info("Initializing shims for Flink 1.11");
-      flinkShimsClass = 
Class.forName("org.apache.zeppelin.flink.Flink111Shims");
-    } else if (flinkVersion.getMajorVersion() == 1 && 
flinkVersion.getMinorVersion() == 12) {
+    if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() 
== 12) {
       LOGGER.info("Initializing shims for Flink 1.12");
       flinkShimsClass = 
Class.forName("org.apache.zeppelin.flink.Flink112Shims");
     } else if (flinkVersion.getMajorVersion() == 1 && 
flinkVersion.getMinorVersion() == 13) {
diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
index 2e1f47e..1c83645 100644
--- 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
+++ 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkVersion.java
@@ -79,10 +79,6 @@ public class FlinkVersion {
     return new FlinkVersion(versionString);
   }
 
-  public boolean isFlink110() {
-    return this.majorVersion == 1 && minorVersion == 10;
-  }
-
   public boolean isAfterFlink114() {
     return newerThanOrEqual(FlinkVersion.fromVersionString("1.14.0"));
   }
diff --git a/flink/flink1.10-shims/pom.xml b/flink/flink1.10-shims/pom.xml
deleted file mode 100644
index 14d2dd5..0000000
--- a/flink/flink1.10-shims/pom.xml
+++ /dev/null
@@ -1,217 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>flink-parent</artifactId>
-        <groupId>org.apache.zeppelin</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>flink1.10-shims</artifactId>
-    <packaging>jar</packaging>
-    <name>Zeppelin: Flink1.10 Shims</name>
-
-    <properties>
-        <flink.version>${flink1.10.version}</flink.version>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.zeppelin</groupId>
-            <artifactId>flink-shims</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.11</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.reflections</groupId>
-                    <artifactId>reflections</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>eclipse-add-source</id>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile-first</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <scalaVersion>${flink.scala.version}</scalaVersion>
-                    <args>
-                        <arg>-unchecked</arg>
-                        <arg>-deprecation</arg>
-                        <arg>-feature</arg>
-                        <arg>-target:jvm-1.8</arg>
-                    </args>
-                    <jvmArgs>
-                        <jvmArg>-Xms1024m</jvmArg>
-                        <jvmArg>-Xmx1024m</jvmArg>
-                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
-                    </jvmArgs>
-                    <javacArgs>
-                        <javacArg>-source</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-target</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
-                    </javacArgs>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-interpreter-setting</id>
-                        <phase>none</phase>
-                        <configuration>
-                            <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
diff --git 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
deleted file mode 100644
index 5711884..0000000
--- 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
-import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.util.ResourceUtil;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableUtils;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.scala.BatchTableEnvironment;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableAggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.utils.EncodingUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims110.Flink110ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.regex.Matcher;
-
-
-/**
- * Shims for flink 1.10
- */
-public class Flink110Shims extends FlinkShims {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink110Shims.class);
-  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_TABLE, 
"Create table under current catalog and database."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.DROP_TABLE, "Drop 
table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] 
<name>;'"))
-          .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_VIEW, 
"Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS 
<query>;'"))
-          .append(formatCommand(SqlCommandParser.SqlCommand.DESCRIBE, 
"Describes the schema of a table with the given name."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.DROP_VIEW, 
"Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommandParser.SqlCommand.EXPLAIN, 
"Describes the execution plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.HELP, "Prints the 
available commands."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_INTO, 
"Inserts the results of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_OVERWRITE, 
"Inserts the results of a SQL SELECT query into a declared table sink and 
overwrite existing data."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.SELECT, "Executes 
a SQL SELECT query on the Flink cluster."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.SET, "Sets a 
session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for 
listing all properties."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_FUNCTIONS, 
"Shows all user-defined and built-in functions."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_TABLES, 
"Shows all registered tables."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.SOURCE, "Reads a 
SQL SELECT query from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommandParser.SqlCommand.USE_CATALOG, "Sets 
the current catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommandParser.SqlCommand.USE, "Sets the 
current default database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
-          .toAttributedString();
-
-  public Flink110Shims(FlinkVersion flinkVersion, Properties properties) {
-    super(flinkVersion, properties);
-  }
-
-  @Override
-  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
-    ((ExecutionConfig) batchConfig).disableSysoutLogging();
-    ((ExecutionConfig) streamConfig).disableSysoutLogging();
-  }
-
-  @Override
-  public Object createStreamExecutionEnvironmentFactory(Object 
streamExecutionEnvironment) {
-    return new StreamExecutionEnvironmentFactory() {
-      @Override
-      public StreamExecutionEnvironment createExecutionEnvironment() {
-        return (StreamExecutionEnvironment) streamExecutionEnvironment;
-      }
-    };
-  }
-
-  @Override
-  public Object createCatalogManager(Object config) {
-    return new CatalogManager("default_catalog",
-            new GenericInMemoryCatalog("default_catalog", "default_database"));
-  }
-
-  @Override
-  public String getPyFlinkPythonPath(Properties properties) throws IOException 
{
-    String flinkHome = System.getenv("FLINK_HOME");
-    if (flinkHome != null) {
-      File tmpDir = Files.createTempDirectory("zeppelin").toFile();
-      List<File> depFiles = null;
-      try {
-        depFiles = 
ResourceUtil.extractBuiltInDependencies(tmpDir.getAbsolutePath(), "pyflink", 
true);
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-      StringBuilder builder = new StringBuilder();
-      for (File file : depFiles) {
-        LOGGER.info("Adding extracted file to PYTHONPATH: " + 
file.getAbsolutePath());
-        builder.append(file.getAbsolutePath() + ":");
-      }
-      return builder.toString();
-    } else {
-      throw new IOException("No FLINK_HOME is specified");
-    }
-  }
-
-  @Override
-  public Object getCollectStreamTableSink(InetAddress targetAddress, int 
targetPort, Object serializer) {
-    return new CollectStreamTableSink(targetAddress, targetPort, 
(TypeSerializer<Tuple2<Boolean, Row >>) serializer);
-  }
-
-  @Override
-  public List collectToList(Object table) throws Exception {
-    return TableUtils.collectToList((Table) table);
-  }
-
-  @Override
-  public void startMultipleInsert(Object tblEnv, InterpreterContext context) 
throws Exception {
-
-  }
-
-  @Override
-  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext 
context) throws Exception {
-    ((TableEnvironment) tblEnv).sqlUpdate(sql);
-  }
-
-  @Override
-  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, 
InterpreterContext context) throws Exception {
-    ((TableEnvironment) tblEnv).execute(jobName);
-    return true;
-  }
-
-  @Override
-  public boolean rowEquals(Object row1, Object row2) {
-    return ((Row)row1).equals((Row) row2);
-  }
-
-  public Object fromDataSet(Object btenv, Object ds) {
-    return Flink110ScalaShims.fromDataSet((BatchTableEnvironment) btenv, 
(DataSet) ds);
-  }
-
-  @Override
-  public Object toDataSet(Object btenv, Object table) {
-    return Flink110ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) 
table);
-  }
-
-  @Override
-  public void registerTableSink(Object stenv, String tableName, Object 
collectTableSink) {
-    ((TableEnvironment) stenv).registerTableSink(tableName, (TableSink) 
collectTableSink);
-  }
-
-  @Override
-  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(ScalarFunction) scalarFunction);
-  }
-
-  @Override
-  public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(TableFunction) tableFunction);
-  }
-
-  @Override
-  public void registerAggregateFunction(Object btenv, String name, Object 
aggregateFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(AggregateFunction) aggregateFunction);
-  }
-
-  @Override
-  public void registerTableAggregateFunction(Object btenv, String name, Object 
tableAggregateFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(TableAggregateFunction) tableAggregateFunction);
-  }
-
-  @Override
-  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, 
String stmt) {
-    // parse
-    for (SqlCommandParser.SqlCommand cmd : 
SqlCommandParser.SqlCommand.values()) {
-      if (cmd.pattern == null){
-        continue;
-      }
-      final Matcher matcher = cmd.pattern.matcher(stmt);
-      if (matcher.matches()) {
-        final String[] groups = new String[matcher.groupCount()];
-        for (int i = 0; i < groups.length; i++) {
-          groups[i] = matcher.group(i + 1);
-        }
-        if (cmd == SqlCommandParser.SqlCommand.EXPLAIN) {
-          String[] operands = cmd.operandConverter.apply(groups).get();
-          if (operands[0].equalsIgnoreCase("select")) {
-            // flink 1.10 only suppports explain select statement.
-            String[] newOperands = new String[]{operands[0] + " " + 
operands[1]};
-            return Optional.of(new SqlCommandParser.SqlCommandCall(cmd, 
newOperands, stmt));
-          } else {
-            return Optional.empty();
-          }
-        } else {
-          return cmd.operandConverter.apply(groups)
-                  .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, 
operands, stmt));
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  @Override
-  public void executeSql(Object tableEnv, String sql) {
-    throw new RuntimeException("Should not be called for flink 1.10");
-  }
-
-  @Override
-  public String explain(Object tableEnv, String sql) {
-    Table table = ((TableEnvironment) tableEnv).sqlQuery(sql);
-    return ((TableEnvironment) tableEnv).explain(table);
-  }
-
-  @Override
-  public String sqlHelp() {
-    return MESSAGE_HELP.toString();
-  }
-
-  @Override
-  public void setCatalogManagerSchemaResolver(Object catalogManager,
-                                              Object parser,
-                                              Object environmentSetting) {
-    // do nothing for flink 1.10
-  }
-
-  @Override
-  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, 
Object effectiveConfig) {
-    CustomCommandLine customCommandLine = null;
-    try {
-      customCommandLine = ((CliFrontend) 
cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine);
-    } catch (NoSuchMethodError e) {
-      try {
-        Method method = 
CliFrontend.class.getMethod("getActiveCustomCommandLine", CommandLine.class);
-        customCommandLine = (CustomCommandLine) method.invoke((CliFrontend) 
cliFrontend, commandLine);
-      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException ex) {
-        LOGGER.error("Fail to call getCustomCli", ex);
-        throw new RuntimeException("Fail to call getCustomCli", ex);
-      }
-    }
-    try {
-      return 
customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) 
commandLine);
-    } catch (FlinkException e) {
-      throw new RuntimeException("Fail to call 
applyCommandLineOptionsToConfiguration", e);
-    }
-  }
-
-  @Override
-  public Map extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    try {
-      configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    } catch (NoClassDefFoundError e) {
-      LOGGER.warn("No pyflink jars found");
-    }
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
-  }
-
-  @Override
-  public String[] rowToString(Object row, Object table, Object tableConfig) {
-    return rowToString((Row) row);
-  }
-
-  private String[] rowToString(Row row) {
-    final String[] fields = new String[row.getArity()];
-    for (int i = 0; i < row.getArity(); i++) {
-      final Object field = row.getField(i);
-      if (field == null) {
-        fields[i] = "(NULL)";
-      } else {
-        fields[i] = EncodingUtils.objectToString(field);
-      }
-    }
-    return fields;
-  }
-
-  public boolean isTimeIndicatorType(Object type) {
-    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
-  }
-
-  private Object lookupExecutor(ClassLoader classLoader,
-                               Object settings,
-                               Object sEnv) {
-    try {
-      Map<String, String> executorProperties = ((EnvironmentSettings) 
settings).toExecutorProperties();
-      ExecutorFactory executorFactory = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
-      Method createMethod = executorFactory.getClass()
-              .getMethod("create", Map.class, 
StreamExecutionEnvironment.class);
-
-      return (Executor) createMethod.invoke(
-              executorFactory,
-              executorProperties,
-              (StreamExecutionEnvironment) sEnv);
-    } catch (Exception e) {
-      throw new TableException(
-              "Could not instantiate the executor. Make sure a planner module 
is on the classpath",
-              e);
-    }
-  }
-
-  @Override
-  public ImmutablePair<Object, Object> createPlannerAndExecutor(
-          ClassLoader classLoader, Object environmentSettings, Object sEnv,
-          Object tableConfig, Object functionCatalog, Object catalogManager) {
-    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
-    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, 
plannerProperties)
-            .create(plannerProperties, executor, (TableConfig) tableConfig,
-                    (FunctionCatalog) functionCatalog,
-                    (CatalogManager) catalogManager);
-    return ImmutablePair.of(planner, executor);
-  }
-}
diff --git 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
deleted file mode 100644
index 925e3a7..0000000
--- 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims110;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.experimental.CollectSink;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-/**
- * Table sink for collecting the results locally using sockets.
- */
-public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectStreamTableSink.class);
-
-  private final InetAddress targetAddress;
-  private final int targetPort;
-  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
-
-  private String[] fieldNames;
-  private TypeInformation<?>[] fieldTypes;
-
-  public CollectStreamTableSink(InetAddress targetAddress,
-                                int targetPort,
-                                TypeSerializer<Tuple2<Boolean, Row>> 
serializer) {
-    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + 
targetPort);
-    this.targetAddress = targetAddress;
-    this.targetPort = targetPort;
-    this.serializer = serializer;
-  }
-
-  @Override
-  public String[] getFieldNames() {
-    return fieldNames;
-  }
-
-  @Override
-  public TypeInformation<?>[] getFieldTypes() {
-    return fieldTypes;
-  }
-
-  @Override
-  public CollectStreamTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-    final CollectStreamTableSink copy =
-            new CollectStreamTableSink(targetAddress, targetPort, serializer);
-    copy.fieldNames = fieldNames;
-    copy.fieldTypes = fieldTypes;
-    return copy;
-  }
-
-  @Override
-  public TypeInformation<Row> getRecordType() {
-    return Types.ROW_NAMED(fieldNames, fieldTypes);
-  }
-
-  @Override
-  public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
-    consumeDataStream(stream);
-  }
-
-  @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> 
stream) {
-    // add sink
-    return stream
-            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
-            .name("Zeppelin Flink Sql Stream Collect Sink " + 
UUID.randomUUID())
-            .setParallelism(1);
-  }
-
-  @Override
-  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
-    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
-  }
-}
diff --git 
a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
 
b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
deleted file mode 100644
index 9be7b8a..0000000
--- 
a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims110
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.api.scala.BatchTableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala._
-
-
-object Flink110ScalaShims {
-
-  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
-    btenv.fromDataSet(ds)
-  }
-
-  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
-    btenv.toDataSet[Row](table)
-  }
-}
diff --git a/flink/flink1.11-shims/pom.xml b/flink/flink1.11-shims/pom.xml
deleted file mode 100644
index 04dbf01..0000000
--- a/flink/flink1.11-shims/pom.xml
+++ /dev/null
@@ -1,211 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>flink-parent</artifactId>
-        <groupId>org.apache.zeppelin</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>flink1.11-shims</artifactId>
-    <packaging>jar</packaging>
-    <name>Zeppelin: Flink1.11 Shims</name>
-
-    <properties>
-        <flink.version>${flink1.11.version}</flink.version>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.zeppelin</groupId>
-            <artifactId>flink-shims</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.reflections</groupId>
-                    <artifactId>reflections</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>eclipse-add-source</id>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile-first</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <scalaVersion>${flink.scala.version}</scalaVersion>
-                    <args>
-                        <arg>-unchecked</arg>
-                        <arg>-deprecation</arg>
-                        <arg>-feature</arg>
-                        <arg>-target:jvm-1.8</arg>
-                    </args>
-                    <jvmArgs>
-                        <jvmArg>-Xms1024m</jvmArg>
-                        <jvmArg>-Xmx1024m</jvmArg>
-                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
-                    </jvmArgs>
-                    <javacArgs>
-                        <javacArg>-source</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-target</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
-                    </javacArgs>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-interpreter-setting</id>
-                        <phase>none</phase>
-                        <configuration>
-                            <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
diff --git 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
deleted file mode 100644
index 64979fd..0000000
--- 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
-import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
-import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableAggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.utils.PrintUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-
-
-/**
- * Shims for flink 1.11
- */
-public class Flink111Shims extends FlinkShims {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink111Shims.class);
-  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under 
current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with 
optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual 
table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of 
a table with the given name."))
-          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously 
created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution 
plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
-          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results 
of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the 
results of a SQL SELECT query into a declared table sink and overwrite existing 
data."))
-          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
-          .append(formatCommand(SqlCommand.SET, "Sets a session configuration 
property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all 
properties."))
-          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
user-defined and built-in functions."))
-          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered 
tables."))
-          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query 
from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current 
catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommand.USE, "Sets the current default 
database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
-          .toAttributedString();
-
-  private Map<String, StatementSet> statementSetMap = new 
ConcurrentHashMap<>();
-
-  public Flink111Shims(FlinkVersion flinkVersion, Properties properties) {
-    super(flinkVersion, properties);
-  }
-  @Override
-  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
-    ((ExecutionConfig) batchConfig).disableSysoutLogging();
-    ((ExecutionConfig) streamConfig).disableSysoutLogging();
-  }
-
-  @Override
-  public Object createStreamExecutionEnvironmentFactory(Object 
streamExecutionEnvironment) {
-    return new StreamExecutionEnvironmentFactory() {
-      @Override
-      public StreamExecutionEnvironment createExecutionEnvironment() {
-        return (StreamExecutionEnvironment) streamExecutionEnvironment;
-      }
-    };
-  }
-
-  @Override
-  public Object createCatalogManager(Object config) {
-    return CatalogManager.newBuilder()
-            .classLoader(Thread.currentThread().getContextClassLoader())
-            .config((ReadableConfig) config)
-            .defaultCatalog("default_catalog",
-                    new GenericInMemoryCatalog("default_catalog", 
"default_database"))
-            .build();
-  }
-
-  @Override
-  public String getPyFlinkPythonPath(Properties properties) throws IOException 
{
-    String flinkHome = System.getenv("FLINK_HOME");
-    if (flinkHome != null) {
-      List<File> depFiles = null;
-      depFiles = Arrays.asList(new File(flinkHome + 
"/opt/python").listFiles());
-      StringBuilder builder = new StringBuilder();
-      for (File file : depFiles) {
-        LOGGER.info("Adding extracted file to PYTHONPATH: " + 
file.getAbsolutePath());
-        builder.append(file.getAbsolutePath() + ":");
-      }
-      return builder.toString();
-    } else {
-      throw new IOException("No FLINK_HOME is specified");
-    }
-  }
-
-  @Override
-  public Object getCollectStreamTableSink(InetAddress targetAddress, int 
targetPort, Object serializer) {
-    return new CollectStreamTableSink(targetAddress, targetPort, 
(TypeSerializer<Tuple2<Boolean, Row>>) serializer);
-  }
-
-  @Override
-  public List collectToList(Object table) throws Exception {
-    return Lists.newArrayList(((Table) table).execute().collect());
-  }
-
-  @Override
-  public void startMultipleInsert(Object tblEnv, InterpreterContext context) 
throws Exception {
-    StatementSet statementSet = ((TableEnvironment) 
tblEnv).createStatementSet();
-    statementSetMap.put(context.getParagraphId(), statementSet);
-  }
-
-  @Override
-  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext 
context) throws Exception {
-    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
-  }
-
-  @Override
-  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, 
InterpreterContext context) throws Exception {
-    JobClient jobClient = 
statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while (!jobClient.getJobStatus().get().isTerminalState()) {
-      LOGGER.debug("Wait for job to finish");
-      Thread.sleep(1000 * 5);
-    }
-    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
-      context.out.write("Job is cancelled.\n");
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public boolean rowEquals(Object row1, Object row2) {
-    Row r1 = (Row) row1;
-    Row r2 = (Row) row2;
-    r1.setKind(RowKind.INSERT);
-    r2.setKind(RowKind.INSERT);
-    return r1.equals(r2);
-  }
-
-  @Override
-  public Object fromDataSet(Object btenv, Object ds) {
-    return Flink111ScalaShims.fromDataSet((BatchTableEnvironment) btenv, 
(DataSet) ds);
-  }
-
-  @Override
-  public Object toDataSet(Object btenv, Object table) {
-    return Flink111ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) 
table);
-  }
-
-  @Override
-  public void registerTableSink(Object stenv, String tableName, Object 
collectTableSink) {
-    ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv)
-            .registerTableSinkInternal(tableName, (TableSink) 
collectTableSink);
-  }
-
-  @Override
-  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, 
(ScalarFunction) scalarFunction);
-  }
-
-  @Override
-  public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableFunction) tableFunction);
-  }
-
-  @Override
-  public void registerAggregateFunction(Object btenv, String name, Object 
aggregateFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(AggregateFunction) aggregateFunction);
-  }
-
-  @Override
-  public void registerTableAggregateFunction(Object btenv, String name, Object 
tableAggregateFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableAggregateFunction) tableAggregateFunction);
-  }
-
-  /**
-   * Parse it via flink SqlParser first, then fallback to regular expression 
matching.
-   *
-   * @param tableEnv
-   * @param stmt
-   * @return
-   */
-  @Override
-  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, 
String stmt) {
-    Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
-    SqlCommandCall sqlCommandCall = null;
-    try {
-      // parse statement via regex matching first
-      Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
-      if (callOpt.isPresent()) {
-        sqlCommandCall = callOpt.get();
-      } else {
-        sqlCommandCall = parseBySqlParser(sqlParser, stmt);
-      }
-    } catch (Exception e) {
-      return Optional.empty();
-    }
-    return Optional.of(sqlCommandCall);
-
-  }
-
-  private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) 
throws Exception {
-    List<Operation> operations;
-    try {
-      operations = sqlParser.parse(stmt);
-    } catch (Throwable e) {
-      throw new Exception("Invalidate SQL statement.", e);
-    }
-    if (operations.size() != 1) {
-      throw new Exception("Only single statement is supported now.");
-    }
-
-    final SqlCommand cmd;
-    String[] operands = new String[]{stmt};
-    Operation operation = operations.get(0);
-    if (operation instanceof CatalogSinkModifyOperation) {
-      boolean overwrite = ((CatalogSinkModifyOperation) 
operation).isOverwrite();
-      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
-    } else if (operation instanceof CreateTableOperation) {
-      cmd = SqlCommand.CREATE_TABLE;
-    } else if (operation instanceof DropTableOperation) {
-      cmd = SqlCommand.DROP_TABLE;
-    } else if (operation instanceof AlterTableOperation) {
-      cmd = SqlCommand.ALTER_TABLE;
-    } else if (operation instanceof CreateViewOperation) {
-      cmd = SqlCommand.CREATE_VIEW;
-    } else if (operation instanceof DropViewOperation) {
-      cmd = SqlCommand.DROP_VIEW;
-    } else if (operation instanceof CreateDatabaseOperation) {
-      cmd = SqlCommand.CREATE_DATABASE;
-    } else if (operation instanceof DropDatabaseOperation) {
-      cmd = SqlCommand.DROP_DATABASE;
-    } else if (operation instanceof AlterDatabaseOperation) {
-      cmd = SqlCommand.ALTER_DATABASE;
-    } else if (operation instanceof CreateCatalogOperation) {
-      cmd = SqlCommand.CREATE_CATALOG;
-    } else if (operation instanceof DropCatalogOperation) {
-      cmd = SqlCommand.DROP_CATALOG;
-    } else if (operation instanceof UseCatalogOperation) {
-      cmd = SqlCommand.USE_CATALOG;
-      operands = new String[]{((UseCatalogOperation) 
operation).getCatalogName()};
-    } else if (operation instanceof UseDatabaseOperation) {
-      cmd = SqlCommand.USE;
-      operands = new String[]{((UseDatabaseOperation) 
operation).getDatabaseName()};
-    } else if (operation instanceof ShowCatalogsOperation) {
-      cmd = SqlCommand.SHOW_CATALOGS;
-      operands = new String[0];
-    } else if (operation instanceof ShowDatabasesOperation) {
-      cmd = SqlCommand.SHOW_DATABASES;
-      operands = new String[0];
-    } else if (operation instanceof ShowTablesOperation) {
-      cmd = SqlCommand.SHOW_TABLES;
-      operands = new String[0];
-    } else if (operation instanceof ShowFunctionsOperation) {
-      cmd = SqlCommand.SHOW_FUNCTIONS;
-      operands = new String[0];
-    } else if (operation instanceof CreateCatalogFunctionOperation ||
-            operation instanceof CreateTempSystemFunctionOperation) {
-      cmd = SqlCommand.CREATE_FUNCTION;
-    } else if (operation instanceof DropCatalogFunctionOperation ||
-            operation instanceof DropTempSystemFunctionOperation) {
-      cmd = SqlCommand.DROP_FUNCTION;
-    } else if (operation instanceof AlterCatalogFunctionOperation) {
-      cmd = SqlCommand.ALTER_FUNCTION;
-    } else if (operation instanceof ExplainOperation) {
-      cmd = SqlCommand.EXPLAIN;
-    } else if (operation instanceof DescribeTableOperation) {
-      cmd = SqlCommand.DESCRIBE;
-      operands = new String[]{((DescribeTableOperation) 
operation).getSqlIdentifier().asSerializableString()};
-    } else if (operation instanceof QueryOperation) {
-      cmd = SqlCommand.SELECT;
-    } else {
-      throw new Exception("Unknown operation: " + operation.asSummaryString());
-    }
-
-    return new SqlCommandCall(cmd, operands, stmt);
-  }
-
-  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
-    // parse statement via regex matching
-    for (SqlCommand cmd : SqlCommand.values()) {
-      if (cmd.pattern != null) {
-        final Matcher matcher = cmd.pattern.matcher(stmt);
-        if (matcher.matches()) {
-          final String[] groups = new String[matcher.groupCount()];
-          for (int i = 0; i < groups.length; i++) {
-            groups[i] = matcher.group(i + 1);
-          }
-          return cmd.operandConverter.apply(groups)
-                  .map((operands) -> {
-                    String[] newOperands = operands;
-                    if (cmd == SqlCommand.EXPLAIN) {
-                      // convert `explain xx` to `explain plan for xx`
-                      // which can execute through executeSql method
-                      newOperands = new String[]{"EXPLAIN PLAN FOR " + 
operands[0] + " " + operands[1]};
-                    }
-                    return new SqlCommandCall(cmd, newOperands, stmt);
-                  });
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  @Override
-  public void executeSql(Object tableEnv, String sql) {
-    ((TableEnvironment) tableEnv).executeSql(sql);
-  }
-
-  @Override
-  public String explain(Object tableEnv, String sql) {
-    TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
-    return tableResult.collect().next().getField(0).toString();
-  }
-
-  @Override
-  public String sqlHelp() {
-    return MESSAGE_HELP.toString();
-  }
-
-  /**
-   * Flink 1.11 bind CatalogManager with parser which make blink and flink 
could not share the same CatalogManager.
-   * This is a workaround which always reset CatalogTableSchemaResolver before 
running any flink code.
-   * @param catalogManager
-   * @param parserObject
-   * @param environmentSetting
-   */
-  @Override
-  public void setCatalogManagerSchemaResolver(Object catalogManager,
-                                              Object parserObject,
-                                              Object environmentSetting) {
-    ((CatalogManager) catalogManager).setCatalogTableSchemaResolver(
-            new CatalogTableSchemaResolver((Parser)parserObject,
-                    
((EnvironmentSettings)environmentSetting).isStreamingMode()));
-  }
-
-  @Override
-  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, 
Object effectiveConfig) {
-    CustomCommandLine customCommandLine = 
((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) 
commandLine);
-    try {
-      return 
customCommandLine.applyCommandLineOptionsToConfiguration((CommandLine) 
commandLine);
-    } catch (FlinkException e) {
-      throw new RuntimeException("Fail to call 
applyCommandLineOptionsToConfiguration", e);
-    }
-  }
-
-  @Override
-  public Map extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    try {
-      configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    } catch (NoClassDefFoundError e) {
-      LOGGER.warn("No pyflink jars found");
-    }
-    configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
-  }
-
-  @Override
-  public String[] rowToString(Object row, Object table, Object tableConfig) {
-    return PrintUtils.rowToString((Row) row);
-  }
-
-  public boolean isTimeIndicatorType(Object type) {
-    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
-  }
-
-  private Object lookupExecutor(ClassLoader classLoader,
-                                Object settings,
-                                Object sEnv) {
-    try {
-      Map<String, String> executorProperties = ((EnvironmentSettings) 
settings).toExecutorProperties();
-      ExecutorFactory executorFactory = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
-      Method createMethod = executorFactory.getClass()
-              .getMethod("create", Map.class, 
StreamExecutionEnvironment.class);
-
-      return (Executor) createMethod.invoke(
-              executorFactory,
-              executorProperties,
-              (StreamExecutionEnvironment) sEnv);
-    } catch (Exception e) {
-      throw new TableException(
-              "Could not instantiate the executor. Make sure a planner module 
is on the classpath",
-              e);
-    }
-  }
-
-  @Override
-  public ImmutablePair<Object, Object> createPlannerAndExecutor(
-          ClassLoader classLoader, Object environmentSettings, Object sEnv,
-          Object tableConfig, Object functionCatalog, Object catalogManager) {
-    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
-    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, 
plannerProperties)
-            .create(plannerProperties, executor, (TableConfig) tableConfig,
-                    (FunctionCatalog) functionCatalog,
-                    (CatalogManager) catalogManager);
-    return ImmutablePair.of(planner, executor);
-
-  }
-}
diff --git 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
deleted file mode 100644
index b98f406..0000000
--- 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims111;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.experimental.CollectSink;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-/**
- * Table sink for collecting the results locally using sockets.
- */
-public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectStreamTableSink.class);
-
-  private final InetAddress targetAddress;
-  private final int targetPort;
-  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
-
-  private String[] fieldNames;
-  private TypeInformation<?>[] fieldTypes;
-
-  public CollectStreamTableSink(InetAddress targetAddress,
-                                int targetPort,
-                                TypeSerializer<Tuple2<Boolean, Row>> 
serializer) {
-    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + 
targetPort);
-    this.targetAddress = targetAddress;
-    this.targetPort = targetPort;
-    this.serializer = serializer;
-  }
-
-  @Override
-  public String[] getFieldNames() {
-    return fieldNames;
-  }
-
-  @Override
-  public TypeInformation<?>[] getFieldTypes() {
-    return fieldTypes;
-  }
-
-  @Override
-  public CollectStreamTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-    final CollectStreamTableSink copy =
-            new CollectStreamTableSink(targetAddress, targetPort, serializer);
-    copy.fieldNames = fieldNames;
-    copy.fieldTypes = fieldTypes;
-    return copy;
-  }
-
-  @Override
-  public TypeInformation<Row> getRecordType() {
-    return Types.ROW_NAMED(fieldNames, fieldTypes);
-  }
-
-  @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> 
stream) {
-    // add sink
-    return stream
-            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
-            .name("Zeppelin Flink Sql Stream Collect Sink " + 
UUID.randomUUID())
-            .setParallelism(1);
-  }
-
-  @Override
-  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
-    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
-  }
-}
diff --git 
a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala
 
b/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala
deleted file mode 100644
index abdaca2..0000000
--- 
a/flink/flink1.11-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink111ScalaShims.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims111
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
-import org.apache.flink.types.Row
-
-object Flink111ScalaShims {
-
-  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
-    btenv.fromDataSet(ds)
-  }
-
-  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
-    btenv.toDataSet[Row](table)
-  }
-}
diff --git a/flink/pom.xml b/flink/pom.xml
index 329f79e..02a9ec5 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -37,16 +37,12 @@
         <module>flink-scala-2.11</module>
         <module>flink-scala-2.12</module>
         <module>flink-shims</module>
-        <module>flink1.10-shims</module>
-        <module>flink1.11-shims</module>
         <module>flink1.12-shims</module>
         <module>flink1.13-shims</module>
         <module>flink1.14-shims</module>
     </modules>
 
     <properties>
-        <flink1.10.version>1.10.3</flink1.10.version>
-        <flink1.11.version>1.11.3</flink1.11.version>
         <flink1.12.version>1.12.4</flink1.12.version>
         <flink1.13.version>1.13.2</flink1.13.version>
         <flink1.14.version>1.14.0</flink1.14.version>
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
deleted file mode 100644
index 21f5292..0000000
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest110.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest110 extends FlinkIntegrationTest {
-
-  @Parameterized.Parameters
-  public static List<Object[]> data() {
-    return Arrays.asList(new Object[][]{
-            {"1.10.3", "2.11"},
-            {"1.10.3", "2.12"}
-    });
-  }
-
-  public FlinkIntegrationTest110(String flinkVersion, String scalaVersion) {
-    super(flinkVersion, scalaVersion);
-  }
-}
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
deleted file mode 100644
index 66fe6d8..0000000
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest111.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest111 extends FlinkIntegrationTest {
-
-  @Parameterized.Parameters
-  public static List<Object[]> data() {
-    return Arrays.asList(new Object[][]{
-            {"1.11.3", "2.11"},
-            {"1.11.3", "2.12"}
-    });
-  }
-
-  public FlinkIntegrationTest111(String flinkVersion, String scalaVersion) {
-    super(flinkVersion, scalaVersion);
-  }
-}
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 0fe8673..18d5b73 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -69,7 +69,7 @@ public class ZSessionIntegrationTest extends 
AbstractTestRestApi {
 
     notebook = TestUtils.getInstance(Notebook.class);
     sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
-    flinkHome = DownloadUtils.downloadFlink("1.10.1", "2.11");
+    flinkHome = DownloadUtils.downloadFlink("1.12.4", "2.11");
   }
 
   @AfterClass
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
index 8566803..f43c34b 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java
@@ -29,6 +29,7 @@ import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.utils.TestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +46,11 @@ public abstract class ZeppelinFlinkClusterTest extends 
AbstractTestRestApi {
   private String flinkVersion;
   private String flinkHome;
 
-  public ZeppelinFlinkClusterTest(String flinkVersion) throws Exception {
+  public ZeppelinFlinkClusterTest(String flinkVersion, String scalaVersion) 
throws Exception {
     this.flinkVersion = flinkVersion;
     LOGGER.info("Testing FlinkVersion: " + flinkVersion);
-    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, "2.11");
+    LOGGER.info("Testing ScalaVersion: " + scalaVersion);
+    this.flinkHome = DownloadUtils.downloadFlink(flinkVersion, scalaVersion);
   }
 
   @BeforeClass
@@ -63,6 +65,7 @@ public abstract class ZeppelinFlinkClusterTest extends 
AbstractTestRestApi {
     AbstractTestRestApi.shutDown();
   }
 
+  // TODO(zjffdu) Disable Temporary
   //@Test
   public void testResumeFromCheckpoint() throws Exception {
 
@@ -96,9 +99,11 @@ public abstract class ZeppelinFlinkClusterTest extends 
AbstractTestRestApi {
       note.run(p2.getId(), false);
       p2.waitUntilRunning();
 
-      Thread.sleep(30 * 1000);
-      TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
-              .getInterpreterSettingByName("flink").close();
+      Thread.sleep(60 * 1000);
+      p2.abort();
+
+      // Sleep 5 seconds to ensure checkpoint info is written to note file
+      Thread.sleep(5 * 1000);
       assertTrue(p2.getConfig().toString(), 
p2.getConfig().get("latest_checkpoint_path").toString().contains(checkpointPath));
 
       // run it again
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
index 443b254..d0c6b46 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.integration;
 
+import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
@@ -28,11 +29,12 @@ public class ZeppelinFlinkClusterTest112 extends 
ZeppelinFlinkClusterTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.12.0"}
+            {"1.12.4", "2.11"},
+            {"1.12.4", "2.12"}
     });
   }
 
-  public ZeppelinFlinkClusterTest112(String flinkVersion) throws Exception {
-    super(flinkVersion);
+  public ZeppelinFlinkClusterTest112(String flinkVersion, String scalaVersion) 
throws Exception {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java
similarity index 81%
rename from 
zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
rename to 
zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java
index 7b1ba0f..cc8dcd9 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest111.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest113.java
@@ -24,16 +24,17 @@ import java.util.Arrays;
 import java.util.List;
 
 //@RunWith(value = Parameterized.class)
-public class ZeppelinFlinkClusterTest111 extends ZeppelinFlinkClusterTest {
+public class ZeppelinFlinkClusterTest113 extends ZeppelinFlinkClusterTest {
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.11.3"}
+            {"1.13.2", "2.11"},
+            {"1.13.2", "2.12"}
     });
   }
 
-  public ZeppelinFlinkClusterTest111(String flinkVersion) throws Exception {
-    super(flinkVersion);
+  public ZeppelinFlinkClusterTest113(String flinkVersion, String scalaVersion) 
throws Exception {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java
similarity index 81%
rename from 
zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
rename to 
zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java
index 4400706..1668491 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest110.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest114.java
@@ -24,16 +24,17 @@ import java.util.Arrays;
 import java.util.List;
 
 //@RunWith(value = Parameterized.class)
-public class ZeppelinFlinkClusterTest110 extends ZeppelinFlinkClusterTest {
+public class ZeppelinFlinkClusterTest114 extends ZeppelinFlinkClusterTest {
 
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"1.10.2"}
+            {"1.14.0", "2.11"},
+            {"1.14.0", "2.12"}
     });
   }
 
-  public ZeppelinFlinkClusterTest110(String flinkVersion) throws Exception {
-    super(flinkVersion);
+  public ZeppelinFlinkClusterTest114(String flinkVersion, String scalaVersion) 
throws Exception {
+    super(flinkVersion, scalaVersion);
   }
 }
diff --git 
a/zeppelin-interpreter-integration/src/test/resources/init_stream.scala 
b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala
index e4153be..f8d27ae 100644
--- a/zeppelin-interpreter-integration/src/test/resources/init_stream.scala
+++ b/zeppelin-interpreter-integration/src/test/resources/init_stream.scala
@@ -6,7 +6,7 @@ import java.util.Collections
 import scala.collection.JavaConversions._
 
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-senv.enableCheckpointing(15000)
+senv.enableCheckpointing(5000)
 
 val data = senv.addSource(new SourceFunction[(Long, String)] with 
ListCheckpointed[java.lang.Long] {
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index 7aa6d6c..b8cd73f 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -556,6 +556,7 @@ public class RemoteInterpreterEventServer implements 
RemoteInterpreterEventServi
                                     Map<String, String> config)
           throws InterpreterRPCException, TException {
     try {
+      LOGGER.info("Update paragraph config");
       Note note = interpreterSettingManager.getNotebook().getNote(noteId);
       note.getParagraph(paragraphId).updateConfig(config);
       interpreterSettingManager.getNotebook().saveNote(note, 
AuthenticationInfo.ANONYMOUS);

Reply via email to