This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 11052f6fb4 [ZEPPELIN-5846] Remove support flink 1.12 (#4558)
11052f6fb4 is described below

commit 11052f6fb435c812788a5f30211b9d71842f9182
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Feb 6 14:36:14 2023 +0800

    [ZEPPELIN-5846] Remove support flink 1.12 (#4558)
    
    * save
    
    * Fix flink test
    
    * [ZEPPELIN-5846] Remove support of flink 1.12
---
 .github/workflows/core.yml                         |   2 +-
 docs/interpreter/flink.md                          |   1 -
 flink/README.md                                    |   3 +-
 flink/flink-scala-parent/pom.xml                   |  53 +-
 .../java/org/apache/zeppelin/flink/FlinkShims.java |   5 +-
 flink/flink1.12-shims/pom.xml                      | 211 --------
 .../org/apache/zeppelin/flink/Flink112Shims.java   | 360 -------------
 .../zeppelin/flink/Flink112SqlInterpreter.java     | 584 ---------------------
 .../flink/shims112/CollectStreamTableSink.java     |  97 ----
 .../zeppelin/flink/shims112/SqlCommandParser.java  | 355 -------------
 .../flink/shims112/Flink112ScalaShims.scala        |  36 --
 flink/pom.xml                                      |  12 -
 .../integration/FlinkIntegrationTest112.java       |  40 --
 .../integration/ZSessionIntegrationTest.java       |   2 +-
 .../integration/ZeppelinFlinkClusterTest112.java   |  40 --
 15 files changed, 6 insertions(+), 1795 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index 76380103da..d4cb767c90 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -233,7 +233,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        flink: [112, 113, 114, 115, 116]
+        flink: [113, 114, 115, 116]
     steps:
       - name: Checkout
         uses: actions/checkout@v3
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 0241ff9baf..0d70ef6491 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -151,7 +151,6 @@ Flink 1.15 is scala free and has changed its binary 
distribution. If you would l
 * Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
 * Download flink-table-api-scala-bridge_2.12-1.15.0.jar and 
flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib
 
-
 ## Flink on Zeppelin Architecture
 
 <img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/flink_architecture.png">
diff --git a/flink/README.md b/flink/README.md
index 53618ad1b3..bb8aa74a06 100644
--- a/flink/README.md
+++ b/flink/README.md
@@ -8,9 +8,10 @@ This is the doc for Zeppelin developers who want to work on 
flink interpreter.
 
 Flink interpreter is more complex than other interpreter (such as jdbc, 
shell). Currently it has following 8 modules
 * flink-shims
-* flink1.12-shims
 * flink1.13-shims
 * flink1.14-shims
+* flink1.15-shims
+* flink1.16-shims
 * flink-scala-parent
 * flink-scala-2.11
 * flink-scala-2.12
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index 8bbeebd26f..84223db389 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -35,7 +35,7 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>${flink1.12.version}</flink.version>
+    <flink.version>${flink1.13.version}</flink.version>
     <flink.hadoop.version>${hadoop2.7.version}</flink.hadoop.version>
     <hive.version>2.3.4</hive.version>
     <hiverunner.version>4.0.0</hiverunner.version>
@@ -55,12 +55,6 @@
       <version>${project.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>flink1.12-shims</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>flink1.13-shims</artifactId>
@@ -894,51 +888,6 @@
 
   <profiles>
 
-    <profile>
-      <id>flink-112</id>
-      <properties>
-        <flink.version>${flink1.12.version}</flink.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          <artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.reflections</groupId>
-              <artifactId>reflections</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.flink</groupId>
-          <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
-          <version>${flink.version}</version>
-          <scope>provided</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
     <profile>
       <id>flink-113</id>
       <properties>
diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index a7b6e9871a..916c3313dc 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -54,10 +54,7 @@ public abstract class FlinkShims {
                                       Properties properties)
       throws Exception {
     Class<?> flinkShimsClass;
-    if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() 
== 12) {
-      LOGGER.info("Initializing shims for Flink 1.12");
-      flinkShimsClass = 
Class.forName("org.apache.zeppelin.flink.Flink112Shims");
-    } else if (flinkVersion.getMajorVersion() == 1 && 
flinkVersion.getMinorVersion() == 13) {
+    if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() 
== 13) {
       LOGGER.info("Initializing shims for Flink 1.13");
       flinkShimsClass = 
Class.forName("org.apache.zeppelin.flink.Flink113Shims");
     } else if (flinkVersion.getMajorVersion() == 1 && 
flinkVersion.getMinorVersion() == 14) {
diff --git a/flink/flink1.12-shims/pom.xml b/flink/flink1.12-shims/pom.xml
deleted file mode 100644
index e10f005c7d..0000000000
--- a/flink/flink1.12-shims/pom.xml
+++ /dev/null
@@ -1,211 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>flink-parent</artifactId>
-        <groupId>org.apache.zeppelin</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>flink1.12-shims</artifactId>
-    <packaging>jar</packaging>
-    <name>Zeppelin: Flink1.12 Shims</name>
-
-    <properties>
-        <flink.version>${flink1.12.version}</flink.version>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.zeppelin</groupId>
-            <artifactId>flink-shims</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-clients_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-runtime_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-scala-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-scala_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner-blink_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.reflections</groupId>
-                    <artifactId>reflections</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-python_${flink.scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>eclipse-add-source</id>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile-first</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <scalaVersion>${flink.scala.version}</scalaVersion>
-                    <args>
-                        <arg>-unchecked</arg>
-                        <arg>-deprecation</arg>
-                        <arg>-feature</arg>
-                        <arg>-target:jvm-1.8</arg>
-                    </args>
-                    <jvmArgs>
-                        <jvmArg>-Xms1024m</jvmArg>
-                        <jvmArg>-Xmx1024m</jvmArg>
-                        <jvmArg>-XX:MaxMetaspaceSize=${MaxMetaspace}</jvmArg>
-                    </jvmArgs>
-                    <javacArgs>
-                        <javacArg>-source</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-target</javacArg>
-                        <javacArg>${java.version}</javacArg>
-                        <javacArg>-Xlint:all,-serial,-path,-options</javacArg>
-                    </javacArgs>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-interpreter-setting</id>
-                        <phase>none</phase>
-                        <configuration>
-                            <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
diff --git 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
deleted file mode 100644
index 187eea0b4d..0000000000
--- 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.scala.DataSet;
-import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableException;
-import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.delegation.*;
-import org.apache.flink.table.factories.ComponentFactoryService;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.TableAggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.utils.PrintUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.FlinkException;
-import org.apache.zeppelin.flink.shims112.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Shims for flink 1.12
- */
-public class Flink112Shims extends FlinkShims {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink112Shims.class);
-
-  private Flink112SqlInterpreter batchSqlInterpreter;
-  private Flink112SqlInterpreter streamSqlInterpreter;
-
-
-  public Flink112Shims(FlinkVersion flinkVersion, Properties properties) {
-    super(flinkVersion, properties);
-  }
-
-  public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
-    this.batchSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, 
true);
-  }
-
-  public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
-    this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, 
false);
-  }
-
-  @Override
-  public Object createResourceManager(List<URL> jars, Object tableConfig) {
-    return null;
-  }
-
-  @Override
-  public Object createFunctionCatalog(Object tableConfig, Object 
catalogManager, Object moduleManager, List<URL> jars) {
-    return new FunctionCatalog((TableConfig) tableConfig, (CatalogManager) 
catalogManager, (ModuleManager) moduleManager);
-  }
-
-  @Override
-  public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
-    // do nothing
-  }
-
-  @Override
-  public Object createScalaBlinkStreamTableEnvironment(Object 
environmentSettingsObj,
-                                                       Object senvObj,
-                                                       Object tableConfigObj,
-                                                       Object moduleManagerObj,
-                                                       Object 
functionCatalogObj,
-                                                       Object 
catalogManagerObj,
-                                                       List<URL> jars,
-                                                       ClassLoader 
classLoader) {
-    EnvironmentSettings environmentSettings = (EnvironmentSettings) 
environmentSettingsObj;
-    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
-    TableConfig tableConfig = (TableConfig) tableConfigObj;
-    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
-    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
-    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
-    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
-            classLoader, environmentSettings, senv,
-            tableConfig, moduleManager, functionCatalog, catalogManager);
-    Planner planner = (Planner) pair.left;
-    Executor executor = (Executor) pair.right;
-
-    return new 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager,
-            moduleManager,
-            functionCatalog, tableConfig, new 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv),
-            planner, executor, environmentSettings.isStreamingMode(), 
classLoader);
-  }
-
-  @Override
-  public Object createJavaBlinkStreamTableEnvironment(Object 
environmentSettingsObj,
-                                                      Object senvObj,
-                                                      Object tableConfigObj,
-                                                      Object moduleManagerObj,
-                                                      Object 
functionCatalogObj,
-                                                      Object catalogManagerObj,
-                                                      List<URL> jars,
-                                                      ClassLoader classLoader) 
{
-    EnvironmentSettings environmentSettings = (EnvironmentSettings) 
environmentSettingsObj;
-    StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj;
-    TableConfig tableConfig = (TableConfig) tableConfigObj;
-    ModuleManager moduleManager = (ModuleManager) moduleManagerObj;
-    FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj;
-    CatalogManager catalogManager = (CatalogManager) catalogManagerObj;
-    ImmutablePair<Object, Object> pair = createPlannerAndExecutor(
-            classLoader, environmentSettings, senv,
-            tableConfig, moduleManager, functionCatalog, catalogManager);
-    Planner planner = (Planner) pair.left;
-    Executor executor = (Executor) pair.right;
-
-    return new StreamTableEnvironmentImpl(catalogManager, moduleManager,
-            functionCatalog, tableConfig, senv, planner, executor, 
environmentSettings.isStreamingMode(), classLoader);
-  }
-  @Override
-  public Object createStreamExecutionEnvironmentFactory(Object 
streamExecutionEnvironment) {
-    return new StreamExecutionEnvironmentFactory() {
-      @Override
-      public StreamExecutionEnvironment 
createExecutionEnvironment(Configuration configuration) {
-        return (StreamExecutionEnvironment) streamExecutionEnvironment;
-      }
-    };
-  }
-
-  @Override
-  public Object createCatalogManager(Object config) {
-    return CatalogManager.newBuilder()
-            .classLoader(Thread.currentThread().getContextClassLoader())
-            .config((ReadableConfig) config)
-            .defaultCatalog("default_catalog",
-                    new GenericInMemoryCatalog("default_catalog", 
"default_database"))
-            .build();
-  }
-
-  @Override
-  public String getPyFlinkPythonPath(Properties properties) throws IOException 
{
-    String mode = properties.getProperty("flink.execution.mode");
-    if ("yarn-application".equalsIgnoreCase(mode)) {
-      // for yarn application mode, FLINK_HOME is container working directory
-      String flinkHome = new File(".").getAbsolutePath();
-      return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
-    }
-
-    String flinkHome = System.getenv("FLINK_HOME");
-    if (StringUtils.isNotBlank(flinkHome)) {
-      return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
-    } else {
-      throw new IOException("No FLINK_HOME is specified");
-    }
-  }
-
-  private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
-    LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
-    if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
-      throw new IOException(String.format("PyFlink folder %s does not exist or 
is not a folder",
-              pyFlinkFolder.getAbsolutePath()));
-    }
-    List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
-    StringBuilder builder = new StringBuilder();
-    for (File file : depFiles) {
-      LOGGER.info("Adding extracted file {} to PYTHONPATH", 
file.getAbsolutePath());
-      builder.append(file.getAbsolutePath() + ":");
-    }
-    return builder.toString();
-  }
-
-  @Override
-  public Object getCollectStreamTableSink(InetAddress targetAddress, int 
targetPort, Object serializer) {
-    return new CollectStreamTableSink(targetAddress, targetPort, 
(TypeSerializer<Tuple2<Boolean, Row>>) serializer);
-  }
-
-  @Override
-  public List collectToList(Object table) throws Exception {
-    return Lists.newArrayList(((Table) table).execute().collect());
-  }
-
-  @Override
-  public boolean rowEquals(Object row1, Object row2) {
-    Row r1 = (Row) row1;
-    Row r2 = (Row) row2;
-    r1.setKind(RowKind.INSERT);
-    r2.setKind(RowKind.INSERT);
-    return r1.equals(r2);
-  }
-
-  @Override
-  public Object fromDataSet(Object btenv, Object ds) {
-    return Flink112ScalaShims.fromDataSet((BatchTableEnvironment) btenv, 
(DataSet) ds);
-  }
-
-  @Override
-  public Object toDataSet(Object btenv, Object table) {
-    return Flink112ScalaShims.toDataSet((BatchTableEnvironment) btenv, (Table) 
table);
-  }
-
-  @Override
-  public void registerTableSink(Object stenv, String tableName, Object 
collectTableSink) {
-    ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv)
-            .registerTableSinkInternal(tableName, (TableSink) 
collectTableSink);
-  }
-
-  @Override
-  public void registerScalarFunction(Object btenv, String name, Object 
scalarFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).createTemporarySystemFunction(name, 
(ScalarFunction) scalarFunction);
-  }
-
-  @Override
-  public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableFunction) tableFunction);
-  }
-
-  @Override
-  public void registerAggregateFunction(Object btenv, String name, Object 
aggregateFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(AggregateFunction) aggregateFunction);
-  }
-
-  @Override
-  public void registerTableAggregateFunction(Object btenv, String name, Object 
tableAggregateFunction) {
-    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableAggregateFunction) tableAggregateFunction);
-  }
-
-  /**
-   * Flink 1.11 bind CatalogManager with parser which make blink and flink 
could not share the same CatalogManager.
-   * This is a workaround which always reset CatalogTableSchemaResolver before 
running any flink code.
-   * @param catalogManager
-   * @param parserObject
-   * @param environmentSetting
-   */
-  @Override
-  public void setCatalogManagerSchemaResolver(Object catalogManager,
-                                              Object parserObject,
-                                              Object environmentSetting) {
-    ((CatalogManager) catalogManager).setCatalogTableSchemaResolver(
-            new CatalogTableSchemaResolver((Parser)parserObject,
-                    
((EnvironmentSettings)environmentSetting).isStreamingMode()));
-  }
-
-  @Override
-  public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, 
Object effectiveConfig) {
-    CustomCommandLine customCommandLine = 
((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) 
commandLine);
-    try {
-       ((Configuration) 
effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) 
commandLine));
-       return effectiveConfig;
-    } catch (FlinkException e) {
-      throw new RuntimeException("Fail to call addAll", e);
-    }
-  }
-
-  @Override
-  public String[] rowToString(Object row, Object table, Object tableConfig) {
-    return PrintUtils.rowToString((Row) row);
-  }
-
-  public boolean isTimeIndicatorType(Object type) {
-    return FlinkTypeFactory.isTimeIndicatorType((TypeInformation<?>) type);
-  }
-
-  private Object lookupExecutor(ClassLoader classLoader,
-                                Object settings,
-                                Object sEnv) {
-    try {
-      Map<String, String> executorProperties = ((EnvironmentSettings) 
settings).toExecutorProperties();
-      ExecutorFactory executorFactory = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
-      Method createMethod = executorFactory.getClass()
-              .getMethod("create", Map.class, 
StreamExecutionEnvironment.class);
-
-      return createMethod.invoke(
-              executorFactory,
-              executorProperties,
-              sEnv);
-    } catch (Exception e) {
-      throw new TableException(
-              "Could not instantiate the executor. Make sure a planner module 
is on the classpath",
-              e);
-    }
-  }
-
-  @Override
-  public ImmutablePair<Object, Object> createPlannerAndExecutor(
-          ClassLoader classLoader, Object environmentSettings, Object sEnv,
-          Object tableConfig, Object moduleManager, Object functionCatalog, 
Object catalogManager) {
-    EnvironmentSettings settings = (EnvironmentSettings) environmentSettings;
-    Executor executor = (Executor) lookupExecutor(classLoader, settings, sEnv);
-    Map<String, String> plannerProperties = settings.toPlannerProperties();
-    Planner planner = ComponentFactoryService.find(PlannerFactory.class, 
plannerProperties)
-            .create(plannerProperties, executor, (TableConfig) tableConfig,
-                    (FunctionCatalog) functionCatalog,
-                    (CatalogManager) catalogManager);
-    return ImmutablePair.of(planner, executor);
-  }
-
-  @Override
-  public Object createBlinkPlannerEnvSettingBuilder() {
-    return EnvironmentSettings.newInstance().useBlinkPlanner();
-  }
-
-  @Override
-  public Object createOldPlannerEnvSettingBuilder() {
-    return EnvironmentSettings.newInstance().useOldPlanner();
-  }
-
-  @Override
-  public InterpreterResult runSqlList(String st, InterpreterContext context, 
boolean isBatch) {
-    if (isBatch) {
-      return batchSqlInterpreter.runSqlList(st, context);
-    } else {
-      return streamSqlInterpreter.runSqlList(st, context);
-    }
-  }
-}
diff --git 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
deleted file mode 100644
index b72ef266f7..0000000000
--- 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
+++ /dev/null
@@ -1,584 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.JobListener;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.utils.EncodingUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.CollectionUtil;
-import org.apache.zeppelin.flink.shims112.SqlCommandParser;
-import org.apache.zeppelin.flink.shims112.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
-import org.apache.zeppelin.interpreter.util.SqlSplitter;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-public class Flink112SqlInterpreter {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink112SqlInterpreter.class);
-  private static final AttributedString MESSAGE_HELP =
-          new AttributedStringBuilder()
-                  .append("The following commands are available:\n\n")
-                  .append(
-                          formatCommand(
-                                  SqlCommand.CREATE_TABLE,
-                                  "Create table under current catalog and 
database."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.DROP_TABLE,
-                                  "Drop table with optional catalog and 
database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.CREATE_VIEW,
-                                  "Creates a virtual table from a SQL query. 
Syntax: 'CREATE VIEW <name> AS <query>;'"))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.DESCRIBE,
-                                  "Describes the schema of a table with the 
given name."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.DROP_VIEW,
-                                  "Deletes a previously created virtual table. 
Syntax: 'DROP VIEW <name>;'"))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.EXPLAIN,
-                                  "Describes the execution plan of a query or 
table with the given name."))
-                  .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.INSERT_INTO,
-                                  "Inserts the results of a SQL SELECT query 
into a declared table sink."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.INSERT_OVERWRITE,
-                                  "Inserts the results of a SQL SELECT query 
into a declared table sink and overwrite existing data."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.SELECT,
-                                  "Executes a SQL SELECT query on the Flink 
cluster."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.SET,
-                                  "Sets a session configuration property. 
Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.SHOW_FUNCTIONS,
-                                  "Shows all user-defined and built-in 
functions."))
-                  .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all 
registered tables."))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.USE_CATALOG,
-                                  "Sets the current catalog. The current 
database is set to the catalog's default one. Experimental! Syntax: 'USE 
CATALOG <name>;'"))
-                  .append(
-                          formatCommand(
-                                  SqlCommand.USE,
-                                  "Sets the current default database. 
Experimental! Syntax: 'USE <name>;'"))
-                  .style(AttributedStyle.DEFAULT.underline())
-                  .append("\nHint")
-                  .style(AttributedStyle.DEFAULT)
-                  .append(
-                          ": Make sure that a statement ends with ';' for 
finalizing (multi-line) statements.")
-                  .toAttributedString();
-
-  private static AttributedString formatCommand(SqlCommandParser.SqlCommand 
cmd, String description) {
-    return new AttributedStringBuilder()
-            .style(AttributedStyle.DEFAULT.bold())
-            .append(cmd.toString())
-            .append("\t\t")
-            .style(AttributedStyle.DEFAULT)
-            .append(description)
-            .append('\n')
-            .toAttributedString();
-  }
-
-  private FlinkSqlContext flinkSqlContext;
-  private TableEnvironment tbenv;
-  private ZeppelinContext z;
-  private SqlCommandParser sqlCommandParser;
-  private SqlSplitter sqlSplitter;
-  private boolean isBatch;
-  private ReentrantReadWriteLock.WriteLock lock = new 
ReentrantReadWriteLock().writeLock();
-  // paragraphId -> StatementSet
-  private Map<String, StatementSet> statementSetMap = new 
ConcurrentHashMap<>();
-
-
-  public Flink112SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean 
isBatch) {
-    this.flinkSqlContext = flinkSqlContext;
-    this.isBatch = isBatch;
-    if (isBatch) {
-      this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
-    } else {
-      this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
-    }
-    this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
-    this.sqlCommandParser = new SqlCommandParser((TableEnvironmentInternal) 
tbenv);
-    this.sqlSplitter = new SqlSplitter();
-    JobListener jobListener = new JobListener() {
-      @Override
-      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable 
Throwable throwable) {
-        if (lock.isHeldByCurrentThread()) {
-          lock.unlock();
-          LOGGER.info("UnLock JobSubmitLock");
-        }
-      }
-
-      @Override
-      public void onJobExecuted(@Nullable JobExecutionResult 
jobExecutionResult, @Nullable Throwable throwable) {
-
-      }
-    };
-
-    ((ExecutionEnvironment) 
flinkSqlContext.getBenv()).registerJobListener(jobListener);
-    ((StreamExecutionEnvironment) 
flinkSqlContext.getSenv()).registerJobListener(jobListener);
-  }
-
-  public InterpreterResult runSqlList(String st, InterpreterContext context) {
-    try {
-      boolean runAsOne = 
Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-      List<String> sqls = 
sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
-      boolean isFirstInsert = true;
-      boolean hasInsert = false;
-      for (String sql : sqls) {
-        SqlCommandParser.SqlCommandCall sqlCommand = null;
-        try {
-          sqlCommand = sqlCommandParser.parse(sql);
-        } catch (Exception e1) {
-          try {
-            context.out.write("%text Invalid Sql statement: " + sql + "\n");
-            context.out.write(e1.toString());
-            context.out.write(MESSAGE_HELP.toString());
-          } catch (IOException e2) {
-            return new InterpreterResult(InterpreterResult.Code.ERROR, 
e2.toString());
-          }
-          return new InterpreterResult(InterpreterResult.Code.ERROR);
-        }
-
-        try {
-          if (sqlCommand.command == SqlCommand.INSERT_INTO ||
-                  sqlCommand.command == SqlCommand.INSERT_OVERWRITE) {
-            hasInsert = true;
-            if (isFirstInsert && runAsOne) {
-              startMultipleInsert(context);
-              isFirstInsert = false;
-            }
-          }
-          callCommand(sqlCommand, sql, context);
-          context.out.flush();
-        } catch (Throwable e) {
-          LOGGER.error("Fail to run sql:" + sql, e);
-          try {
-            context.out.write("%text Fail to run sql command: " +
-                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
-          } catch (IOException ex) {
-            LOGGER.warn("Unexpected exception:", ex);
-            return new InterpreterResult(InterpreterResult.Code.ERROR,
-                    ExceptionUtils.getStackTrace(e));
-          }
-          return new InterpreterResult(InterpreterResult.Code.ERROR);
-        }
-      }
-
-      if (runAsOne && hasInsert) {
-        try {
-          lock.lock();
-          String jobName = context.getStringLocalProperty("jobName", st);
-          if (executeMultipleInsertInto(jobName, context)) {
-            context.out.write("Insertion successfully.\n");
-          }
-        } catch (Exception e) {
-          LOGGER.error("Fail to execute sql as one job", e);
-          return new InterpreterResult(InterpreterResult.Code.ERROR, 
ExceptionUtils.getStackTrace(e));
-        } finally {
-          if (lock.isHeldByCurrentThread()) {
-            lock.unlock();
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOGGER.error("Fail to execute sql", e);
-      return new InterpreterResult(InterpreterResult.Code.ERROR, 
ExceptionUtils.getStackTrace(e));
-    } finally {
-      statementSetMap.remove(context.getParagraphId());
-    }
-
-    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
-  }
-
-  private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
-                           String sql,
-                           InterpreterContext context) throws Exception {
-    switch (cmdCall.command) {
-      case SET:
-        callSet(cmdCall, context);
-        break;
-      case HELP:
-        callHelp(context);
-        break;
-      case SHOW_CATALOGS:
-        callShowCatalogs(context);
-        break;
-      case SHOW_CURRENT_CATALOG:
-        callShowCurrentCatalog(context);
-        break;
-      case SHOW_DATABASES:
-        callShowDatabases(context);
-        break;
-      case SHOW_CURRENT_DATABASE:
-        callShowCurrentDatabase(context);
-        break;
-      case SHOW_TABLES:
-        callShowTables(context);
-        break;
-      case SHOW_FUNCTIONS:
-        callShowFunctions(context);
-        break;
-      case SHOW_MODULES:
-        callShowModules(context);
-        break;
-      case SHOW_PARTITIONS:
-        callShowPartitions(sql, context);
-        break;
-      case USE_CATALOG:
-        callUseCatalog(cmdCall.operands[0], context);
-        break;
-      case USE:
-        callUseDatabase(cmdCall.operands[0], context);
-        break;
-      case DESC:
-      case DESCRIBE:
-        callDescribe(cmdCall.operands[0], context);
-        break;
-      case EXPLAIN:
-        callExplain(cmdCall.operands[0], context);
-        break;
-      case SELECT:
-        callSelect(cmdCall.operands[0], context);
-        break;
-      case INSERT_INTO:
-      case INSERT_OVERWRITE:
-        callInsertInto(cmdCall.operands[0], context);
-        break;
-      case CREATE_TABLE:
-        callDDL(sql, context, "Table has been created.");
-        break;
-      case DROP_TABLE:
-        callDDL(sql, context, "Table has been dropped.");
-        break;
-      case ALTER_TABLE:
-        callDDL(sql, context, "Alter table succeeded!");
-        break;
-      case CREATE_VIEW:
-        callDDL(sql, context, "View has been created.");
-        break;
-      case DROP_VIEW:
-        callDDL(sql, context, "View has been dropped.");
-        break;
-      case ALTER_VIEW:
-        callDDL(sql, context, "Alter view succeeded!");
-        break;
-      case CREATE_FUNCTION:
-        callDDL(sql, context, "Function has been created.");
-        break;
-      case DROP_FUNCTION:
-        callDDL(sql, context, "Function has been removed.");
-        break;
-      case ALTER_FUNCTION:
-        callDDL(sql, context, "Alter function succeeded!");
-        break;
-      case CREATE_DATABASE:
-        callDDL(sql, context, "Database has been created.");
-        break;
-      case DROP_DATABASE:
-        callDDL(sql, context, "Database has been removed.");
-        break;
-      case ALTER_DATABASE:
-        callDDL(sql, context, "Alter database succeeded!");
-        break;
-      case CREATE_CATALOG:
-        callDDL(sql, context, "Catalog has been created.");
-        break;
-      case DROP_CATALOG:
-        callDDL(sql, context, "Catalog has been dropped.");
-        break;
-      default:
-        throw new Exception("Unsupported command: " + cmdCall.command);
-    }
-  }
-
-  private void callDDL(String sql, InterpreterContext context, String message) 
throws IOException {
-    try {
-      lock.lock();
-      this.tbenv.executeSql(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write(message + "\n");
-  }
-
-  private void callUseCatalog(String catalog, InterpreterContext context) 
throws IOException {
-    tbenv.executeSql("USE CATALOG `" + catalog + "`");
-  }
-
-  private void callHelp(InterpreterContext context) throws IOException {
-    context.out.write(MESSAGE_HELP.toString() + "\n");
-  }
-
-  private void callShowCatalogs(InterpreterContext context) throws IOException 
{
-    TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
-    List<String> catalogs = 
CollectionUtil.iteratorToList(tableResult.collect()).stream()
-            .map(r -> checkNotNull(r.getField(0)).toString())
-            .collect(Collectors.toList());
-    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + 
"\n");
-  }
-
-  private void callShowCurrentCatalog(InterpreterContext context) throws 
IOException {
-    TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
-    String catalog = tableResult.collect().next().toString();
-    context.out.write("%text current catalog: " + catalog + "\n");
-  }
-
-  private void callShowDatabases(InterpreterContext context) throws 
IOException {
-    TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
-    List<String> databases = 
CollectionUtil.iteratorToList(tableResult.collect()).stream()
-            .map(r -> checkNotNull(r.getField(0)).toString())
-            .collect(Collectors.toList());
-    context.out.write(
-            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
-  }
-
-  private void callShowCurrentDatabase(InterpreterContext context) throws 
IOException {
-    TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
-    String database = tableResult.collect().next().toString();
-    context.out.write("%text current database: " + database + "\n");
-  }
-
-  private void callShowTables(InterpreterContext context) throws IOException {
-    TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
-    List<String> tables = 
CollectionUtil.iteratorToList(tableResult.collect()).stream()
-            .map(r -> checkNotNull(r.getField(0)).toString())
-            .filter(tbl -> !tbl.startsWith("UnnamedTable"))
-            .collect(Collectors.toList());
-    context.out.write(
-            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
-  }
-
-  private void callShowFunctions(InterpreterContext context) throws 
IOException {
-    TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
-    List<String> functions = 
CollectionUtil.iteratorToList(tableResult.collect()).stream()
-            .map(r -> checkNotNull(r.getField(0)).toString())
-            .collect(Collectors.toList());
-    context.out.write(
-            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
-  }
-
-  private void callShowModules(InterpreterContext context) throws IOException {
-    String[] modules = this.tbenv.listModules();
-    context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + 
"\n");
-  }
-
-  private void callShowPartitions(String sql, InterpreterContext context) 
throws IOException {
-    TableResult tableResult = this.tbenv.executeSql(sql);
-    List<String> functions = 
CollectionUtil.iteratorToList(tableResult.collect()).stream()
-            .map(r -> checkNotNull(r.getField(0)).toString())
-            .collect(Collectors.toList());
-    context.out.write(
-            "%table partitions\n" + StringUtils.join(functions, "\n") + "\n");
-  }
-
-  public void startMultipleInsert(InterpreterContext context) throws Exception 
{
-    StatementSet statementSet = tbenv.createStatementSet();
-    statementSetMap.put(context.getParagraphId(), statementSet);
-  }
-
-  public void addInsertStatement(String sql, InterpreterContext context) 
throws Exception {
-    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
-  }
-
-  public boolean executeMultipleInsertInto(String jobName, InterpreterContext 
context) throws Exception {
-    JobClient jobClient = 
statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while (!jobClient.getJobStatus().get().isTerminalState()) {
-      LOGGER.debug("Wait for job to finish");
-      Thread.sleep(1000 * 5);
-    }
-    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
-      context.out.write("Job is cancelled.\n");
-      return false;
-    }
-    return true;
-  }
-
-  private void callUseDatabase(String databaseName,
-                               InterpreterContext context) throws IOException {
-    this.tbenv.executeSql("USE `" + databaseName + "`");
-  }
-
-  private void callDescribe(String name, InterpreterContext context) throws 
IOException {
-    TableResult tableResult = null;
-    try {
-      tableResult = tbenv.executeSql("DESCRIBE " + name);
-    } catch (Exception e) {
-      throw new IOException("Fail to describe table: " + name, e);
-    }
-    CloseableIterator<Row> result = tableResult.collect();
-    StringBuilder builder = new StringBuilder();
-    builder.append("Column\tType\n");
-    while (result.hasNext()) {
-      Row row = result.next();
-      builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
-    }
-    context.out.write("%table\n" + builder.toString());
-  }
-
-  private void callExplain(String sql, InterpreterContext context) throws 
IOException {
-    try {
-      lock.lock();
-      TableResult tableResult = tbenv.executeSql(sql);
-      String result = tableResult.collect().next().getField(0).toString();
-      context.out.write(result + "\n");
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-  }
-
-  public void callSelect(String sql, InterpreterContext context) throws 
IOException {
-    try {
-      lock.lock();
-      if (isBatch) {
-        callBatchInnerSelect(sql, context);
-      } else {
-        callStreamInnerSelect(sql, context);
-      }
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-  }
-
-  public void callBatchInnerSelect(String sql, InterpreterContext context) 
throws IOException {
-    Table table = this.tbenv.sqlQuery(sql);
-    String result = z.showData(table);
-    context.out.write(result);
-  }
-
-  public void callStreamInnerSelect(String sql, InterpreterContext context) 
throws IOException {
-    flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
-  }
-
-  private String removeSingleQuote(String value) {
-    value = value.trim();
-    if (value.startsWith("'")) {
-      value = value.substring(1);
-    }
-    if (value.endsWith("'")) {
-      value = value.substring(0, value.length() - 1);
-    }
-    return value;
-  }
-
-  public void callSet(SqlCommandParser.SqlCommandCall sqlCommand, 
InterpreterContext context) throws Exception {
-    if (sqlCommand.operands.length == 0) {
-      // show all properties
-      final Map<String, String> properties = 
this.tbenv.getConfig().getConfiguration().toMap();
-      List<String> prettyEntries = new ArrayList<>();
-      for (String k : properties.keySet()) {
-        prettyEntries.add(
-                String.format(
-                        "'%s' = '%s'",
-                        EncodingUtils.escapeSingleQuotes(k),
-                        EncodingUtils.escapeSingleQuotes(properties.get(k))));
-      }
-      prettyEntries.sort(String::compareTo);
-      prettyEntries.forEach(entry -> {
-        try {
-          context.out.write(entry + "\n");
-        } catch (IOException e) {
-          LOGGER.warn("Fail to write output", e);
-        }
-      });
-    } else {
-      String key = removeSingleQuote(sqlCommand.operands[0]);
-      String value = removeSingleQuote(sqlCommand.operands[1]);
-      if ("execution.runtime-mode".equals(key)) {
-        throw new UnsupportedOperationException("execution.runtime-mode is not 
supported to set, " +
-                "you can use %flink.ssql & %flink.bsql to switch between 
streaming mode and batch mode");
-      }
-      LOGGER.info("Set table config: {}={}", key, value);
-      this.tbenv.getConfig().getConfiguration().setString(key, value);
-    }
-  }
-
-  public void callInsertInto(String sql,
-                             InterpreterContext context) throws IOException {
-    if (!isBatch) {
-      context.getLocalProperties().put("flink.streaming.insert_into", "true");
-    }
-    try {
-      lock.lock();
-      boolean runAsOne = 
Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-      if (!runAsOne) {
-        this.tbenv.sqlUpdate(sql);
-        String jobName = context.getStringLocalProperty("jobName", sql);
-        this.tbenv.execute(jobName);
-        context.out.write("Insertion successfully.\n");
-      } else {
-        addInsertStatement(sql, context);
-      }
-    } catch (Exception e) {
-      throw new IOException(e);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-  }
-}
diff --git 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
deleted file mode 100644
index 3650254e02..0000000000
--- 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/CollectStreamTableSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims112;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.experimental.CollectSink;
-import org.apache.flink.table.sinks.RetractStreamTableSink;
-import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-/**
- * Table sink for collecting the results locally using sockets.
- */
-public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(CollectStreamTableSink.class);
-
-  private final InetAddress targetAddress;
-  private final int targetPort;
-  private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
-
-  private String[] fieldNames;
-  private TypeInformation<?>[] fieldTypes;
-
-  public CollectStreamTableSink(InetAddress targetAddress,
-                                int targetPort,
-                                TypeSerializer<Tuple2<Boolean, Row>> 
serializer) {
-    LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + 
targetPort);
-    this.targetAddress = targetAddress;
-    this.targetPort = targetPort;
-    this.serializer = serializer;
-  }
-
-  @Override
-  public String[] getFieldNames() {
-    return fieldNames;
-  }
-
-  @Override
-  public TypeInformation<?>[] getFieldTypes() {
-    return fieldTypes;
-  }
-
-  @Override
-  public CollectStreamTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-    final CollectStreamTableSink copy =
-            new CollectStreamTableSink(targetAddress, targetPort, serializer);
-    copy.fieldNames = fieldNames;
-    copy.fieldTypes = fieldTypes;
-    return copy;
-  }
-
-  @Override
-  public TypeInformation<Row> getRecordType() {
-    return Types.ROW_NAMED(fieldNames, fieldTypes);
-  }
-
-  @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> 
stream) {
-    // add sink
-    return stream
-            .addSink(new CollectSink<>(targetAddress, targetPort, serializer))
-            .name("Zeppelin Flink Sql Stream Collect Sink " + 
UUID.randomUUID())
-            .setParallelism(1);
-  }
-
-  @Override
-  public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
-    return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
-  }
-}
diff --git 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
 
b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
deleted file mode 100644
index 309250f583..0000000000
--- 
a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims112;
-
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.operations.*;
-import org.apache.flink.table.operations.ddl.*;
-
-import javax.annotation.Nullable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This is copied from flink project with minor modification.
- * Simple parser for determining the type of command and its parameters.
- * */
-public final class SqlCommandParser {
-
-  private Parser parser;
-
-  public SqlCommandParser(TableEnvironmentInternal tbenv) {
-    this.parser = tbenv.getParser();
-  }
-
-  /**
-   * Parse a sql statement and return corresponding {@link SqlCommandCall}. If 
the statement is
-   * invalid, a {@link Exception} will be thrown.
-   *
-   * @param stmt The statement to be parsed
-   * @return the corresponding SqlCommandCall.
-   */
-  public SqlCommandCall parse(String stmt) throws Exception {
-    // normalize
-    stmt = stmt.trim();
-    // remove ';' at the end
-    if (stmt.endsWith(";")) {
-      stmt = stmt.substring(0, stmt.length() - 1).trim();
-    }
-
-    // parse statement via regex matching first
-    Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
-    if (callOpt.isPresent()) {
-      return callOpt.get();
-    } else {
-      return parseBySqlParser(stmt);
-    }
-  }
-
-  private SqlCommandCall parseBySqlParser(String stmt) throws Exception {
-    List<Operation> operations;
-    try {
-      operations = parser.parse(stmt);
-    } catch (Throwable e) {
-      throw new Exception("Invalidate SQL statement.", e);
-    }
-    if (operations.size() != 1) {
-      throw new Exception("Only single statement is supported now.");
-    }
-
-    final SqlCommand cmd;
-    String[] operands = new String[] {stmt};
-    Operation operation = operations.get(0);
-    if (operation instanceof CatalogSinkModifyOperation) {
-      boolean overwrite = ((CatalogSinkModifyOperation) 
operation).isOverwrite();
-      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
-    } else if (operation instanceof CreateTableOperation) {
-      cmd = SqlCommand.CREATE_TABLE;
-    } else if (operation instanceof DropTableOperation) {
-      cmd = SqlCommand.DROP_TABLE;
-    } else if (operation instanceof AlterTableOperation) {
-      cmd = SqlCommand.ALTER_TABLE;
-    } else if (operation instanceof CreateViewOperation) {
-      cmd = SqlCommand.CREATE_VIEW;
-    } else if (operation instanceof DropViewOperation) {
-      cmd = SqlCommand.DROP_VIEW;
-    } else if (operation instanceof AlterViewOperation) {
-      cmd = SqlCommand.ALTER_VIEW;
-    } else if (operation instanceof CreateDatabaseOperation) {
-      cmd = SqlCommand.CREATE_DATABASE;
-    } else if (operation instanceof DropDatabaseOperation) {
-      cmd = SqlCommand.DROP_DATABASE;
-    } else if (operation instanceof AlterDatabaseOperation) {
-      cmd = SqlCommand.ALTER_DATABASE;
-    } else if (operation instanceof CreateCatalogOperation) {
-      cmd = SqlCommand.CREATE_CATALOG;
-    } else if (operation instanceof DropCatalogOperation) {
-      cmd = SqlCommand.DROP_CATALOG;
-    } else if (operation instanceof UseCatalogOperation) {
-      cmd = SqlCommand.USE_CATALOG;
-      operands = new String[] {((UseCatalogOperation) 
operation).getCatalogName()};
-    } else if (operation instanceof UseDatabaseOperation) {
-      cmd = SqlCommand.USE;
-      operands = new String[] {((UseDatabaseOperation) 
operation).getDatabaseName()};
-    } else if (operation instanceof ShowCatalogsOperation) {
-      cmd = SqlCommand.SHOW_CATALOGS;
-      operands = new String[0];
-    } else if (operation instanceof ShowCurrentCatalogOperation) {
-      cmd = SqlCommand.SHOW_CURRENT_CATALOG;
-      operands = new String[0];
-    } else if (operation instanceof ShowDatabasesOperation) {
-      cmd = SqlCommand.SHOW_DATABASES;
-      operands = new String[0];
-    } else if (operation instanceof ShowCurrentDatabaseOperation) {
-      cmd = SqlCommand.SHOW_CURRENT_DATABASE;
-      operands = new String[0];
-    } else if (operation instanceof ShowTablesOperation) {
-      cmd = SqlCommand.SHOW_TABLES;
-      operands = new String[0];
-    } else if (operation instanceof ShowFunctionsOperation) {
-      cmd = SqlCommand.SHOW_FUNCTIONS;
-      operands = new String[0];
-    } else if (operation instanceof ShowPartitionsOperation) {
-      cmd = SqlCommand.SHOW_PARTITIONS;
-    } else if (operation instanceof CreateCatalogFunctionOperation
-            || operation instanceof CreateTempSystemFunctionOperation) {
-      cmd = SqlCommand.CREATE_FUNCTION;
-    } else if (operation instanceof DropCatalogFunctionOperation
-            || operation instanceof DropTempSystemFunctionOperation) {
-      cmd = SqlCommand.DROP_FUNCTION;
-    } else if (operation instanceof AlterCatalogFunctionOperation) {
-      cmd = SqlCommand.ALTER_FUNCTION;
-    } else if (operation instanceof ExplainOperation) {
-      cmd = SqlCommand.EXPLAIN;
-    } else if (operation instanceof DescribeTableOperation) {
-      cmd = SqlCommand.DESCRIBE;
-      operands =
-              new String[] {
-                      ((DescribeTableOperation) operation)
-                              .getSqlIdentifier()
-                              .asSerializableString()
-              };
-    } else if (operation instanceof QueryOperation) {
-      cmd = SqlCommand.SELECT;
-    } else {
-      throw new Exception("Unknown operation: " + operation.asSummaryString());
-    }
-
-    return new SqlCommandCall(cmd, operands);
-  }
-
-  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
-    // parse statement via regex matching
-    for (SqlCommand cmd : SqlCommand.values()) {
-      if (cmd.hasRegexPattern()) {
-        final Matcher matcher = cmd.pattern.matcher(stmt);
-        if (matcher.matches()) {
-          final String[] groups = new String[matcher.groupCount()];
-          for (int i = 0; i < groups.length; i++) {
-            groups[i] = matcher.group(i + 1);
-          }
-          return cmd.operandConverter
-                  .apply(groups)
-                  .map(
-                          (operands) -> {
-                            String[] newOperands = operands;
-                            if (cmd == SqlCommand.EXPLAIN) {
-                              // convert `explain xx` to `explain plan for xx`
-                              // which can execute through executeSql method
-                              newOperands =
-                                      new String[] {
-                                              "EXPLAIN PLAN FOR "
-                                                      + operands[0]
-                                                      + " "
-                                                      + operands[1]
-                                      };
-                            }
-                            return new SqlCommandCall(cmd, newOperands);
-                          });
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-
-  private static final Function<String[], Optional<String[]>> NO_OPERANDS =
-          (operands) -> Optional.of(new String[0]);
-
-  private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
-          (operands) -> Optional.of(new String[] {operands[0]});
-
-  private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL;
-
-  /** Supported SQL commands. */
-  public enum SqlCommand {
-
-    HELP("HELP", NO_OPERANDS),
-
-    SHOW_CATALOGS,
-
-    SHOW_CURRENT_CATALOG,
-
-    SHOW_DATABASES,
-
-    SHOW_CURRENT_DATABASE,
-
-    SHOW_TABLES,
-
-    SHOW_FUNCTIONS,
-
-    // FLINK-17396
-    SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),
-
-    SHOW_PARTITIONS,
-
-    USE_CATALOG,
-
-    USE,
-
-    CREATE_CATALOG,
-
-    DROP_CATALOG,
-
-    DESC("DESC\\s+(.*)", SINGLE_OPERAND),
-
-    DESCRIBE,
-
-    // supports both `explain xx` and `explain plan for xx` now
-    // TODO should keep `explain xx` ?
-    // only match "EXPLAIN SELECT xx" and "EXPLAIN INSERT xx" here
-    // "EXPLAIN PLAN FOR xx" should be parsed via sql parser
-    EXPLAIN(
-            "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
-            (operands) -> {
-              return Optional.of(new String[] {operands[0], operands[1]});
-            }),
-
-    CREATE_DATABASE,
-
-    DROP_DATABASE,
-
-    ALTER_DATABASE,
-
-    CREATE_TABLE,
-
-    DROP_TABLE,
-
-    ALTER_TABLE,
-
-    CREATE_VIEW,
-
-    DROP_VIEW,
-
-    ALTER_VIEW,
-
-    CREATE_FUNCTION,
-
-    DROP_FUNCTION,
-
-    ALTER_FUNCTION,
-
-    SELECT,
-
-    INSERT_INTO,
-
-    INSERT_OVERWRITE,
-
-    SET(
-            "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the 
left side of '='
-            (operands) -> {
-              if (operands.length < 3) {
-                return Optional.empty();
-              } else if (operands[0] == null) {
-                return Optional.of(new String[0]);
-              }
-              return Optional.of(new String[] {operands[1], operands[2]});
-            }),
-
-    SOURCE("SOURCE\\s+(.*)", SINGLE_OPERAND);
-
-    public final @Nullable Pattern pattern;
-    public final @Nullable Function<String[], Optional<String[]>> 
operandConverter;
-
-    SqlCommand() {
-      this.pattern = null;
-      this.operandConverter = null;
-    }
-
-    SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> 
operandConverter) {
-      this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
-      this.operandConverter = operandConverter;
-    }
-
-    @Override
-    public String toString() {
-      return super.toString().replace('_', ' ');
-    }
-
-    public boolean hasOperands() {
-      return operandConverter != NO_OPERANDS;
-    }
-
-    public boolean hasRegexPattern() {
-      return pattern != null;
-    }
-  }
-
-  /** Call of SQL command with operands and command type. */
-  public static class SqlCommandCall {
-    public final SqlCommand command;
-    public final String[] operands;
-
-    public SqlCommandCall(SqlCommand command, String[] operands) {
-      this.command = command;
-      this.operands = operands;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SqlCommandCall that = (SqlCommandCall) o;
-      return command == that.command && Arrays.equals(operands, that.operands);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = Objects.hash(command);
-      result = 31 * result + Arrays.hashCode(operands);
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return command + "(" + Arrays.toString(operands) + ")";
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
 
b/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
deleted file mode 100644
index 988ad4e289..0000000000
--- 
a/flink/flink1.12-shims/src/main/scala/org/apache/zeppelin/flink/shims112/Flink112ScalaShims.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.shims112
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
-import org.apache.flink.types.Row
-
-object Flink112ScalaShims {
-
-  def fromDataSet(btenv: BatchTableEnvironment, ds: DataSet[_]): Table = {
-    btenv.fromDataSet(ds)
-  }
-
-  def toDataSet(btenv: BatchTableEnvironment, table: Table): DataSet[Row] = {
-    btenv.toDataSet[Row](table)
-  }
-}
diff --git a/flink/pom.xml b/flink/pom.xml
index 5ac374ce37..c0e17389c3 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -36,7 +36,6 @@
     <modules>
         <module>flink-scala-parent</module>
         <module>flink-shims</module>
-        <module>flink1.12-shims</module>
         <module>flink1.13-shims</module>
         <module>flink1.14-shims</module>
         <module>flink1.15-shims</module>
@@ -44,7 +43,6 @@
     </modules>
 
     <properties>
-        <flink1.12.version>1.12.4</flink1.12.version>
         <flink1.13.version>1.13.2</flink1.13.version>
         <flink1.14.version>1.14.0</flink1.14.version>
         <flink1.15.version>1.15.1</flink1.15.version>
@@ -87,16 +85,6 @@
             </modules>
         </profile>
 
-        <profile>
-            <id>flink-112</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <modules>
-                <module>flink-scala-2.11</module>
-                <module>flink-scala-2.12</module>
-            </modules>
-        </profile>
     </profiles>
 
 </project>
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
deleted file mode 100644
index 50bb29cf30..0000000000
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest112.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(value = Parameterized.class)
-public class FlinkIntegrationTest112 extends FlinkIntegrationTest {
-
-  @Parameterized.Parameters
-  public static List<Object[]> data() {
-    return Arrays.asList(new Object[][]{
-            {"1.12.4", "2.11"},
-            {"1.12.4", "2.12"}
-    });
-  }
-
-  public FlinkIntegrationTest112(String flinkVersion, String scalaVersion) {
-    super(flinkVersion, scalaVersion);
-  }
-}
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 2b7e3a622a..1c3fb82eae 100644
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -68,7 +68,7 @@ public class ZSessionIntegrationTest extends 
AbstractTestRestApi {
 
     notebook = TestUtils.getInstance(Notebook.class);
     sparkHome = DownloadUtils.downloadSpark("2.4.4", "2.7");
-    flinkHome = DownloadUtils.downloadFlink("1.12.4", "2.11");
+    flinkHome = DownloadUtils.downloadFlink("1.13.2", "2.11");
   }
 
   @AfterClass
diff --git 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
 
b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
deleted file mode 100644
index d0c6b46cde..0000000000
--- 
a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest112.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.integration;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-//@RunWith(value = Parameterized.class)
-public class ZeppelinFlinkClusterTest112 extends ZeppelinFlinkClusterTest {
-
-  @Parameterized.Parameters
-  public static List<Object[]> data() {
-    return Arrays.asList(new Object[][]{
-            {"1.12.4", "2.11"},
-            {"1.12.4", "2.12"}
-    });
-  }
-
-  public ZeppelinFlinkClusterTest112(String flinkVersion, String scalaVersion) 
throws Exception {
-    super(flinkVersion, scalaVersion);
-  }
-}

Reply via email to