This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 0bf19c1  [ZEPPELIN-4884] Support all sql of flink 1.11
0bf19c1 is described below

commit 0bf19c1e40f8804be7b3820961720ca43250060e
Author: dijie <nj18652727...@gmail.com>
AuthorDate: Sat Jul 11 11:12:54 2020 +0800

    [ZEPPELIN-4884] Support all sql of flink 1.11
    
    ### What is this PR for?
    Support all sql of flink 1.11
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * [ZEPPELIN-4884] https://issues.apache.org/jira/browse/ZEPPELIN-4884
    
    ### How should this be tested?
    * travis url https://travis-ci.org/github/lonelyGhostisdog/zeppelin
    * Unit Tests & GUI Tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * No.
    
    Author: dijie <nj18652727...@gmail.com>
    Author: Jeff Zhang <zjf...@apache.org>
    Author: 1721563...@qq.com <a18652727118>
    
    Closes #3842 from lonelyGhostisdog/ZEPPELIN-4884 and squashes the following 
commits:
    
    4dea34e17 [dijie] [ZEPPELIN-4884] fix bug.
    962d92885 [dijie] [ZEPPELIN-4884] fix NPE & resolve conflict
    add55c5a2 [dijie] Merge branch 'ZEPPELIN-4884' of 
https://github.com/lonelyGhostisdog/zeppelin into ZEPPELIN-4884
    3f4074839 [dijie] Merge branch 'master' into ZEPPELIN-4884
    dd53e9a0d [dijie] Merge branch 'master' of 
https://github.com/apache/zeppelin into ZEPPELIN-4884
    54cf5a16a [dijie] Merge branch 'master' of 
https://github.com/apache/zeppelin into ZEPPELIN-4884
    2b3157f99 [dijie] Merge branch 'ZEPPELIN-4884' of 
https://github.com/zjffdu/zeppelin into ZEPPELIN-4884
    1e37860af [Jeff Zhang] set CatalogTableSchema resolver before running code
    193567bff [Jeff Zhang] address NPE
    ba8ecc501 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11
    4ecb231c2 [dijie] [ZEPPELIN-4884] add UT for flink 1.11
    0267de4cc [dijie] [ZEPPELIN-4884] fix NPE
    372b61f2d [dijie] Merge branch 'ZEPPELIN-4884' of 
https://github.com/zjffdu/zeppelin into ZEPPELIN-4884
    1c7128378 [dijie] Merge branch 'master' of 
https://github.com/apache/zeppelin into ZEPPELIN-4884
    71d237380 [Jeff Zhang] address NPE
    9786ea116 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11
    49129623c [1721563...@qq.com] [ZEPPELIN-4884] Add unit test for UDF. 
JavaLower.java & JavaUpper.java are used in UT.
    492679915 [1721563...@qq.com] [ZEPPELIN-4884] Due to different Flink 
version return different comment , so change the assert clause
    3b8b91cf0 [1721563...@qq.com] [ZEPPELIN-4884] fix bug. Wrong content
    e79106716 [1721563...@qq.com] [ZEPPELIN-4884] fix bug. it should be 
executeSql instead of explainSql
    1a8e985fc [1721563...@qq.com] [ZEPPELIN-4884] Update tbenv created timing . 
In past , it throw NPE in 173 line of FlinkSqlInterrpeter.java. Because when 
initialized the sqlCommandParser , it's given a null tbenv
    bfdfac8c2 [1721563...@qq.com] [ZEPPELIN-4884] Add support for 'DESC'
    fd0ff4300 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11
    
    (cherry picked from commit 6dfb97a46b6d59676784d4e976251e588a678508)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 flink/flink-shims/pom.xml                          |  18 ++
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  26 +++
 .../zeppelin/flink/sql/SqlCommandParser.java       |  93 +++++----
 .../org/apache/zeppelin/flink/Flink110Shims.java   |  68 ++++++-
 .../org/apache/zeppelin/flink/Flink111Shims.java   | 224 ++++++++++++++++++++-
 flink/interpreter/pom.xml                          |  12 --
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |   4 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |   2 +
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java |  87 ++++----
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |   4 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java |   3 +-
 .../java/org/apache/zeppelin/flink/JavaLower.java  |  26 +++
 .../java/org/apache/zeppelin/flink/JavaUpper.java  |  25 +++
 .../apache/zeppelin/flink/SqlInterpreterTest.java  | 128 +++++++++++-
 14 files changed, 614 insertions(+), 106 deletions(-)

diff --git a/flink/flink-shims/pom.xml b/flink/flink-shims/pom.xml
index 5ca0568..188077c 100644
--- a/flink/flink-shims/pom.xml
+++ b/flink/flink-shims/pom.xml
@@ -33,6 +33,24 @@
     <packaging>jar</packaging>
     <name>Zeppelin: Flink Shims</name>
 
+    <properties>
+        <jline.version>3.9.0</jline.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.jline</groupId>
+            <artifactId>jline-terminal</artifactId>
+            <version>${jline.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jline</groupId>
+            <artifactId>jline-reader</artifactId>
+            <version>${jline.version}</version>
+        </dependency>
+    </dependencies>
+
     <build>
         <plugins>
             <plugin>
diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 9eb600d..7918aab 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -18,7 +18,11 @@
 package org.apache.zeppelin.flink;
 
 
+import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,6 +30,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -75,6 +80,17 @@ public abstract class FlinkShims {
     return flinkShims;
   }
 
+  protected static AttributedString formatCommand(SqlCommandParser.SqlCommand 
cmd, String description) {
+    return new AttributedStringBuilder()
+            .style(AttributedStyle.DEFAULT.bold())
+            .append(cmd.toString())
+            .append("\t\t")
+            .style(AttributedStyle.DEFAULT)
+            .append(description)
+            .append('\n')
+            .toAttributedString();
+  }
+
   public abstract Object createCatalogManager(Object config);
 
   public abstract String getPyFlinkPythonPath(Properties properties) throws 
IOException;
@@ -105,5 +121,15 @@ public abstract class FlinkShims {
 
   public abstract void registerTableSink(Object stenv, String tableName, 
Object collectTableSink);
 
+  public abstract Optional<SqlCommandParser.SqlCommandCall> parseSql(Object 
tableEnv, String stmt);
+
+  public abstract void executeSql(Object tableEnv, String sql);
+
+  public abstract String sqlHelp();
+
+  public abstract void setCatalogManagerSchemaResolver(Object catalogManager,
+                                                       Object parser,
+                                                       Object 
environmentSetting);
+
   public abstract Object getCustomCli(Object cliFrontend, Object commandLine);
 }
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
similarity index 85%
rename from 
flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
rename to 
flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
index 3d3271e..d28d7f6 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
+++ 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
@@ -18,44 +18,31 @@
 
 package org.apache.zeppelin.flink.sql;
 
+import org.apache.zeppelin.flink.FlinkShims;
+
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Function;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
  * Simple parser for determining the type of command and its parameters.
+ * All the SqlCommands should be put into this class, and the parsing logic 
needs to be put ito FlinkShims
+ * because each version of flink has different sql syntax support.
  */
 public final class SqlCommandParser {
 
-  private SqlCommandParser() {
-    // private
-  }
+  private FlinkShims flinkShims;
+  private Object tableEnv;
 
-  public static Optional<SqlCommandCall> parse(String stmt) {
-    // normalize
-    stmt = stmt.trim();
-    // remove ';' at the end
-    if (stmt.endsWith(";")) {
-      stmt = stmt.substring(0, stmt.length() - 1).trim();
-    }
+  public SqlCommandParser(FlinkShims flinkShims, Object tableEnv) {
+    this.flinkShims = flinkShims;
+    this.tableEnv = tableEnv;
+  }
 
-    // parse
-    for (SqlCommand cmd : SqlCommand.values()) {
-      final Matcher matcher = cmd.pattern.matcher(stmt);
-      if (matcher.matches()) {
-        final String[] groups = new String[matcher.groupCount()];
-        for (int i = 0; i < groups.length; i++) {
-          groups[i] = matcher.group(i + 1);
-        }
-        final String sql = stmt;
-        return cmd.operandConverter.apply(groups)
-                .map((operands) -> new SqlCommandCall(cmd, operands, sql));
-      }
-    }
-    return Optional.empty();
+  public Optional<SqlCommandCall> parse(String stmt) {
+    return flinkShims.parseSql(tableEnv, stmt);
   }
 
   // 
--------------------------------------------------------------------------------------------
@@ -112,6 +99,14 @@ public final class SqlCommandParser {
             "USE\\s+(?!CATALOG)(.*)",
             SINGLE_OPERAND),
 
+    CREATE_CATALOG(null, SINGLE_OPERAND),
+
+    DROP_CATALOG(null, SINGLE_OPERAND),
+
+    DESC(
+            "DESC\\s+(.*)",
+            SINGLE_OPERAND),
+
     DESCRIBE(
             "DESCRIBE\\s+(.*)",
             SINGLE_OPERAND),
@@ -120,22 +115,30 @@ public final class SqlCommandParser {
             "EXPLAIN\\s+(.*)",
             SINGLE_OPERAND),
 
-    SELECT(
-            "(SELECT.*)",
+    CREATE_DATABASE(
+            "(CREATE\\s+DATABASE\\s+.*)",
             SINGLE_OPERAND),
 
-    INSERT_INTO(
-            "(INSERT\\s+INTO.*)",
+    DROP_DATABASE(
+            "(DROP\\s+DATABASE\\s+.*)",
             SINGLE_OPERAND),
 
-    INSERT_OVERWRITE(
-            "(INSERT\\s+OVERWRITE.*)",
+    ALTER_DATABASE(
+            "(ALTER\\s+DATABASE\\s+.*)",
             SINGLE_OPERAND),
 
     CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND),
 
     DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),
 
+    ALTER_TABLE(
+            "(ALTER\\s+TABLE\\s+.*)",
+            SINGLE_OPERAND),
+
+    DROP_VIEW(
+            "DROP\\s+VIEW\\s+(.*)",
+            SINGLE_OPERAND),
+
     CREATE_VIEW(
             "CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
             (operands) -> {
@@ -145,24 +148,22 @@ public final class SqlCommandParser {
               return Optional.of(new String[]{operands[0], operands[1]});
             }),
 
-    CREATE_DATABASE(
-            "(CREATE\\s+DATABASE\\s+.*)",
-            SINGLE_OPERAND),
+    CREATE_FUNCTION(null, SINGLE_OPERAND),
 
-    DROP_DATABASE(
-            "(DROP\\s+DATABASE\\s+.*)",
-            SINGLE_OPERAND),
+    DROP_FUNCTION(null, SINGLE_OPERAND),
 
-    DROP_VIEW(
-            "DROP\\s+VIEW\\s+(.*)",
+    ALTER_FUNCTION(null, SINGLE_OPERAND),
+
+    SELECT(
+            "(SELECT.*)",
             SINGLE_OPERAND),
 
-    ALTER_DATABASE(
-            "(ALTER\\s+DATABASE\\s+.*)",
+    INSERT_INTO(
+            "(INSERT\\s+INTO.*)",
             SINGLE_OPERAND),
 
-    ALTER_TABLE(
-            "(ALTER\\s+TABLE\\s+.*)",
+    INSERT_OVERWRITE(
+            "(INSERT\\s+OVERWRITE.*)",
             SINGLE_OPERAND),
 
     SET(
@@ -188,7 +189,11 @@ public final class SqlCommandParser {
     public final Function<String[], Optional<String[]>> operandConverter;
 
     SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> 
operandConverter) {
-      this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+      if (matchingRegex == null) {
+        this.pattern = null;
+      } else {
+        this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+      }
       this.operandConverter = operandConverter;
     }
 
diff --git 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 1f7dd55..4479b6b 100644
--- 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -38,7 +38,11 @@ import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims111.Flink110ScalaShims;
+import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +51,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.file.Files;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.regex.Matcher;
 
 
 /**
@@ -56,6 +62,29 @@ import java.util.Properties;
 public class Flink110Shims extends FlinkShims {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink110Shims.class);
+  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
+          .append("The following commands are available:\n\n")
+          .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_TABLE, 
"Create table under current catalog and database."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.DROP_TABLE, "Drop 
table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] 
<name>;'"))
+          .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_VIEW, 
"Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS 
<query>;'"))
+          .append(formatCommand(SqlCommandParser.SqlCommand.DESCRIBE, 
"Describes the schema of a table with the given name."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.DROP_VIEW, 
"Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
+          .append(formatCommand(SqlCommandParser.SqlCommand.EXPLAIN, 
"Describes the execution plan of a query or table with the given name."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.HELP, "Prints the 
available commands."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_INTO, 
"Inserts the results of a SQL SELECT query into a declared table sink."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_OVERWRITE, 
"Inserts the results of a SQL SELECT query into a declared table sink and 
overwrite existing data."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.SELECT, "Executes 
a SQL SELECT query on the Flink cluster."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.SET, "Sets a 
session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for 
listing all properties."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_FUNCTIONS, 
"Shows all user-defined and built-in functions."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_TABLES, 
"Shows all registered tables."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.SOURCE, "Reads a 
SQL SELECT query from a file and executes it on the Flink cluster."))
+          .append(formatCommand(SqlCommandParser.SqlCommand.USE_CATALOG, "Sets 
the current catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
+          .append(formatCommand(SqlCommandParser.SqlCommand.USE, "Sets the 
current default database. Experimental! Syntax: 'USE <name>;'"))
+          .style(AttributedStyle.DEFAULT.underline())
+          .append("\nHint")
+          .style(AttributedStyle.DEFAULT)
+          .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
+          .toAttributedString();
 
   public Flink110Shims(Properties properties) {
     super(properties);
@@ -67,7 +96,6 @@ public class Flink110Shims extends FlinkShims {
             new GenericInMemoryCatalog("default_catalog", "default_database"));
   }
 
-
   @Override
   public String getPyFlinkPythonPath(Properties properties) throws IOException 
{
     String flinkHome = System.getenv("FLINK_HOME");
@@ -151,6 +179,44 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
+  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, 
String stmt) {
+    // parse
+    for (SqlCommandParser.SqlCommand cmd : 
SqlCommandParser.SqlCommand.values()) {
+      if (cmd.pattern == null){
+        continue;
+      }
+      final Matcher matcher = cmd.pattern.matcher(stmt);
+      if (matcher.matches()) {
+        final String[] groups = new String[matcher.groupCount()];
+        for (int i = 0; i < groups.length; i++) {
+          groups[i] = matcher.group(i + 1);
+        }
+        final String sql = stmt;
+        return cmd.operandConverter.apply(groups)
+                .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, 
operands, sql));
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public void executeSql(Object tableEnv, String sql) {
+    throw new RuntimeException("Should not be called for flink 1.10");
+  }
+
+  @Override
+  public String sqlHelp() {
+    return MESSAGE_HELP.toString();
+  }
+
+  @Override
+  public void setCatalogManagerSchemaResolver(Object catalogManager,
+                                              Object parser,
+                                              Object environmentSetting) {
+    // do nothing for flink 1.10
+  }
+  
+  @Override
   public Object getCustomCli(Object cliFrontend, Object commandLine) {
     return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) 
commandLine);
   }
diff --git 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index f6d929a..d1b89fd 100644
--- 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -27,22 +27,58 @@ import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.operations.DescribeTableOperation;
+import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.ShowCatalogsOperation;
+import org.apache.flink.table.operations.ShowDatabasesOperation;
+import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.UseCatalogOperation;
+import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
+import org.apache.flink.table.operations.ddl.AlterTableOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateViewOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropCatalogOperation;
+import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
+import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims111.Flink111ScalaShims;
+import org.apache.zeppelin.flink.sql.SqlCommandParser;
+import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
+import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +88,10 @@ import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
 
 
 /**
@@ -62,6 +100,29 @@ import java.util.concurrent.ConcurrentHashMap;
 public class Flink111Shims extends FlinkShims {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Flink111Shims.class);
+  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
+          .append("The following commands are available:\n\n")
+          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under 
current catalog and database."))
+          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with 
optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
+          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual 
table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of 
a table with the given name."))
+          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously 
created virtual table. Syntax: 'DROP VIEW <name>;'"))
+          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution 
plan of a query or table with the given name."))
+          .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
+          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results 
of a SQL SELECT query into a declared table sink."))
+          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the 
results of a SQL SELECT query into a declared table sink and overwrite existing 
data."))
+          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
+          .append(formatCommand(SqlCommand.SET, "Sets a session configuration 
property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all 
properties."))
+          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
user-defined and built-in functions."))
+          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered 
tables."))
+          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query 
from a file and executes it on the Flink cluster."))
+          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current 
catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
+          .append(formatCommand(SqlCommand.USE, "Sets the current default 
database. Experimental! Syntax: 'USE <name>;'"))
+          .style(AttributedStyle.DEFAULT.underline())
+          .append("\nHint")
+          .style(AttributedStyle.DEFAULT)
+          .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
+          .toAttributedString();
 
   private Map<String, StatementSet> statementSetMap = new 
ConcurrentHashMap<>();
 
@@ -120,7 +181,7 @@ public class Flink111Shims extends FlinkShims {
   @Override
   public boolean executeMultipleInsertInto(String jobName, Object tblEnv, 
InterpreterContext context) throws Exception {
     JobClient jobClient = 
statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while(!jobClient.getJobStatus().get().isTerminalState()) {
+    while (!jobClient.getJobStatus().get().isTerminalState()) {
       LOGGER.debug("Wait for job to finish");
       Thread.sleep(1000 * 5);
     }
@@ -158,17 +219,172 @@ public class Flink111Shims extends FlinkShims {
 
   @Override
   public void registerTableFunction(Object btenv, String name, Object 
tableFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(TableFunction) tableFunction);
+    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableFunction) tableFunction);
   }
 
   @Override
   public void registerAggregateFunction(Object btenv, String name, Object 
aggregateFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(AggregateFunction) aggregateFunction);
+    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(AggregateFunction) aggregateFunction);
   }
 
   @Override
   public void registerTableAggregateFunction(Object btenv, String name, Object 
tableAggregateFunction) {
-    ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, 
(TableAggregateFunction) tableAggregateFunction);
+    ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, 
(TableAggregateFunction) tableAggregateFunction);
+  }
+
+  /**
+   * Parse it via flink SqlParser first, then fallback to regular expression 
matching.
+   *
+   * @param tableEnv
+   * @param stmt
+   * @return
+   */
+  @Override
+  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, 
String stmt) {
+    Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
+    SqlCommandCall sqlCommandCall = null;
+    try {
+      // parse statement via regex matching first
+      Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
+      if (callOpt.isPresent()) {
+        sqlCommandCall = callOpt.get();
+      } else {
+        sqlCommandCall = parseBySqlParser(sqlParser, stmt);
+      }
+    } catch (Exception e) {
+      return Optional.empty();
+    }
+    return Optional.of(sqlCommandCall);
+
+  }
+
+  private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) 
throws Exception {
+    List<Operation> operations;
+    try {
+      operations = sqlParser.parse(stmt);
+    } catch (Throwable e) {
+      throw new Exception("Invalidate SQL statement.", e);
+    }
+    if (operations.size() != 1) {
+      throw new Exception("Only single statement is supported now.");
+    }
+
+    final SqlCommand cmd;
+    String[] operands = new String[]{stmt};
+    Operation operation = operations.get(0);
+    if (operation instanceof CatalogSinkModifyOperation) {
+      boolean overwrite = ((CatalogSinkModifyOperation) 
operation).isOverwrite();
+      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
+    } else if (operation instanceof CreateTableOperation) {
+      cmd = SqlCommand.CREATE_TABLE;
+    } else if (operation instanceof DropTableOperation) {
+      cmd = SqlCommand.DROP_TABLE;
+    } else if (operation instanceof AlterTableOperation) {
+      cmd = SqlCommand.ALTER_TABLE;
+    } else if (operation instanceof CreateViewOperation) {
+      cmd = SqlCommand.CREATE_VIEW;
+    } else if (operation instanceof DropViewOperation) {
+      cmd = SqlCommand.DROP_VIEW;
+    } else if (operation instanceof CreateDatabaseOperation) {
+      cmd = SqlCommand.CREATE_DATABASE;
+    } else if (operation instanceof DropDatabaseOperation) {
+      cmd = SqlCommand.DROP_DATABASE;
+    } else if (operation instanceof AlterDatabaseOperation) {
+      cmd = SqlCommand.ALTER_DATABASE;
+    } else if (operation instanceof CreateCatalogOperation) {
+      cmd = SqlCommand.CREATE_CATALOG;
+    } else if (operation instanceof DropCatalogOperation) {
+      cmd = SqlCommand.DROP_CATALOG;
+    } else if (operation instanceof UseCatalogOperation) {
+      cmd = SqlCommand.USE_CATALOG;
+      operands = new String[]{((UseCatalogOperation) 
operation).getCatalogName()};
+    } else if (operation instanceof UseDatabaseOperation) {
+      cmd = SqlCommand.USE;
+      operands = new String[]{((UseDatabaseOperation) 
operation).getDatabaseName()};
+    } else if (operation instanceof ShowCatalogsOperation) {
+      cmd = SqlCommand.SHOW_CATALOGS;
+      operands = new String[0];
+    } else if (operation instanceof ShowDatabasesOperation) {
+      cmd = SqlCommand.SHOW_DATABASES;
+      operands = new String[0];
+    } else if (operation instanceof ShowTablesOperation) {
+      cmd = SqlCommand.SHOW_TABLES;
+      operands = new String[0];
+    } else if (operation instanceof ShowFunctionsOperation) {
+      cmd = SqlCommand.SHOW_FUNCTIONS;
+      operands = new String[0];
+    } else if (operation instanceof CreateCatalogFunctionOperation ||
+            operation instanceof CreateTempSystemFunctionOperation) {
+      cmd = SqlCommand.CREATE_FUNCTION;
+    } else if (operation instanceof DropCatalogFunctionOperation ||
+            operation instanceof DropTempSystemFunctionOperation) {
+      cmd = SqlCommand.DROP_FUNCTION;
+    } else if (operation instanceof AlterCatalogFunctionOperation) {
+      cmd = SqlCommand.ALTER_FUNCTION;
+    } else if (operation instanceof ExplainOperation) {
+      cmd = SqlCommand.EXPLAIN;
+    } else if (operation instanceof DescribeTableOperation) {
+      cmd = SqlCommand.DESCRIBE;
+      operands = new String[]{((DescribeTableOperation) 
operation).getSqlIdentifier().asSerializableString()};
+    } else if (operation instanceof QueryOperation) {
+      cmd = SqlCommand.SELECT;
+    } else {
+      throw new Exception("Unknown operation: " + operation.asSummaryString());
+    }
+
+    return new SqlCommandCall(cmd, operands, stmt);
+  }
+
+  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
+    // parse statement via regex matching
+    for (SqlCommand cmd : SqlCommand.values()) {
+      if (cmd.pattern != null) {
+        final Matcher matcher = cmd.pattern.matcher(stmt);
+        if (matcher.matches()) {
+          final String[] groups = new String[matcher.groupCount()];
+          for (int i = 0; i < groups.length; i++) {
+            groups[i] = matcher.group(i + 1);
+          }
+          return cmd.operandConverter.apply(groups)
+                  .map((operands) -> {
+                    String[] newOperands = operands;
+                    if (cmd == SqlCommand.EXPLAIN) {
+                      // convert `explain xx` to `explain plan for xx`
+                      // which can execute through executeSql method
+                      newOperands = new String[]{"EXPLAIN PLAN FOR " + 
operands[0] + " " + operands[1]};
+                    }
+                    return new SqlCommandCall(cmd, newOperands, stmt);
+                  });
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public void executeSql(Object tableEnv, String sql) {
+    ((TableEnvironment) tableEnv).executeSql(sql);
+  }
+
+  @Override
+  public String sqlHelp() {
+    return MESSAGE_HELP.toString();
+  }
+
+  /**
+   * Flink 1.11 bind CatalogManager with parser which make blink and flink 
could not share the same CatalogManager.
+   * This is a workaround which always reset CatalogTableSchemaResolver before 
running any flink code.
+   * @param catalogManager
+   * @param parserObject
+   * @param environmentSetting
+   */
+  @Override
+  public void setCatalogManagerSchemaResolver(Object catalogManager,
+                                              Object parserObject,
+                                              Object environmentSetting) {
+    ((CatalogManager) catalogManager).setCatalogTableSchemaResolver(
+            new CatalogTableSchemaResolver((Parser)parserObject,
+                    
((EnvironmentSettings)environmentSetting).isStreamingMode()));
   }
 
   @Override
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index 5d6d41b..83e2c01 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -145,18 +145,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.jline</groupId>
-      <artifactId>jline-terminal</artifactId>
-      <version>3.9.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.jline</groupId>
-      <artifactId>jline-reader</artifactId>
-      <version>3.9.0</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-python_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index f397187..f48dcb5 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -41,9 +41,11 @@ public class FlinkBatchSqlInterpreter extends 
FlinkSqlInterrpeter {
 
   @Override
   public void open() throws InterpreterException {
-    super.open();
+    this.flinkInterpreter =
+            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
     this.tbenv = flinkInterpreter.getJavaBatchTableEnvironment("blink");
     this.z = flinkInterpreter.getZeppelinContext();
+    super.open();
   }
 
   @Override
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 244559f..d624c43 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.Planner;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index f2d31b6..659bf09 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -61,32 +61,9 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   protected static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSqlInterrpeter.class);
 
-  public static final AttributedString MESSAGE_HELP = new 
AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under 
current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with 
optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual 
table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of 
a table with the given name."))
-          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously 
created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution 
plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommand.HELP, "Prints the available 
commands."))
-          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results 
of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the 
results of a SQL SELECT query into a declared table sink and overwrite existing 
data."))
-          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
-          .append(formatCommand(SqlCommand.SET, "Sets a session configuration 
property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all 
properties."))
-          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
user-defined and built-in functions."))
-          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered 
tables."))
-          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query 
from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current 
catalog. The current database is set to the catalog's default one. 
Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommand.USE, "Sets the current default 
database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing 
(multi-line) statements.")
-          .toAttributedString();
-
   protected FlinkInterpreter flinkInterpreter;
   protected TableEnvironment tbenv;
+  private SqlCommandParser sqlCommandParser;
   private SqlSplitter sqlSplitter;
   private int defaultSqlParallelism;
   private ReentrantReadWriteLock.WriteLock lock = new 
ReentrantReadWriteLock().writeLock();
@@ -105,8 +82,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
 
   @Override
   public void open() throws InterpreterException {
-    flinkInterpreter =
-            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+    sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), 
tbenv);
     this.sqlSplitter = new SqlSplitter();
     JobListener jobListener = new JobListener() {
       @Override
@@ -186,11 +162,11 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       List<String> sqls = sqlSplitter.splitSql(st);
       boolean isFirstInsert = true;
       for (String sql : sqls) {
-        Optional<SqlCommandParser.SqlCommandCall> sqlCommand = 
SqlCommandParser.parse(sql);
+        Optional<SqlCommandParser.SqlCommandCall> sqlCommand = 
sqlCommandParser.parse(sql);
         if (!sqlCommand.isPresent()) {
           try {
             context.out.write("%text Invalid Sql statement: " + sql + "\n");
-            context.out.write(MESSAGE_HELP.toString());
+            context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
           } catch (IOException e) {
             return new InterpreterResult(InterpreterResult.Code.ERROR, 
e.toString());
           }
@@ -275,6 +251,15 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       case SOURCE:
         callSource(cmdCall.operands[0], context);
         break;
+      case CREATE_FUNCTION:
+        callCreateFunction(cmdCall.operands[0], context);
+        break;
+      case DROP_FUNCTION:
+        callDropFunction(cmdCall.operands[0], context);
+        break;
+      case ALTER_FUNCTION:
+        callAlterFunction(cmdCall.operands[0], context);
+        break;
       case SHOW_FUNCTIONS:
         callShowFunctions(context);
         break;
@@ -287,6 +272,13 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       case USE:
         callUseDatabase(cmdCall.operands[0], context);
         break;
+      case CREATE_CATALOG:
+        callCreateCatalog(cmdCall.operands[0], context);
+        break;
+      case DROP_CATALOG:
+        callDropCatalog(cmdCall.operands[0], context);
+        break;
+      case DESC:
       case DESCRIBE:
         callDescribe(cmdCall.operands[0], context);
         break;
@@ -419,17 +411,27 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     context.out.write("Table has been dropped.\n");
   }
 
-  private void callUseCatalog(String catalog, InterpreterContext context) {
+  private void callUseCatalog(String catalog, InterpreterContext context) 
throws IOException {
     this.tbenv.useCatalog(catalog);
   }
 
+  private void callCreateCatalog(String sql, InterpreterContext context) 
throws IOException {
+    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
+    context.out.write("Catalog has been created.\n");
+  }
+
+  private void callDropCatalog(String sql, InterpreterContext context) throws 
IOException {
+    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
+    context.out.write("Catalog has been dropped.\n");
+  }
+
   private void callShowModules(InterpreterContext context) throws IOException {
     String[] modules = this.tbenv.listModules();
     context.out.write("%table module\n" + StringUtils.join(modules, "\n") + 
"\n");
   }
 
   private void callHelp(InterpreterContext context) throws IOException {
-    context.out.write(MESSAGE_HELP.toString());
+    context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
   }
 
   private void callShowCatalogs(InterpreterContext context) throws IOException 
{
@@ -456,6 +458,21 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     runSqlList(sql, context);
   }
 
+  private void callCreateFunction(String sql, InterpreterContext context) 
throws IOException {
+    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
+    context.out.write("Function has been created.\n");
+  }
+
+  private void callDropFunction(String sql, InterpreterContext context) throws 
IOException {
+    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
+    context.out.write("Function has been dropped.\n");
+  }
+
+  private void callAlterFunction(String sql, InterpreterContext context) 
throws IOException {
+    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
+    context.out.write("Function has been modified.\n");
+  }
+
   private void callShowFunctions(InterpreterContext context) throws 
IOException {
     String[] functions = this.tbenv.listUserDefinedFunctions();
     context.out.write(
@@ -553,14 +570,4 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     this.flinkInterpreter.cancel(context);
   }
 
-  private static AttributedString formatCommand(SqlCommand cmd, String 
description) {
-    return new AttributedStringBuilder()
-            .style(AttributedStyle.DEFAULT.bold())
-            .append(cmd.toString())
-            .append("\t\t")
-            .style(AttributedStyle.DEFAULT)
-            .append(description)
-            .append('\n')
-            .toAttributedString();
-  }
 }
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index ab05526..c027341 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -43,8 +43,10 @@ public class FlinkStreamSqlInterpreter extends 
FlinkSqlInterrpeter {
 
   @Override
   public void open() throws InterpreterException {
-    super.open();
+    this.flinkInterpreter =
+            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
     this.tbenv = flinkInterpreter.getJavaStreamTableEnvironment("blink");
+    super.open();
   }
 
   @Override
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 75d06bb..b514f49 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -354,13 +354,14 @@ public class TableEnvFactory {
     Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv());
 
     Map<String, String> plannerProperties = settings.toPlannerProperties();
-    ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+    Planner planner = ComponentFactoryService.find(PlannerFactory.class, 
plannerProperties)
             .create(
                     plannerProperties,
                     executor,
                     tblConfig,
                     blinkFunctionCatalog,
                     catalogManager);
+    this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, 
planner.getParser(), settings);
   }
 
   private static Executor lookupExecutor(
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java
new file mode 100644
index 0000000..483910e
--- /dev/null
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class JavaLower extends ScalarFunction {
+  public String eval(String str) {
+    return str.toLowerCase();
+  }
+}
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java
new file mode 100644
index 0000000..d444084
--- /dev/null
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+public class JavaUpper extends ScalarFunction {
+  public String eval(String str) {
+    return str.toUpperCase();
+  }
+}
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 94a9edc..f84564a 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -64,6 +64,7 @@ import static 
org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import static org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 
 
@@ -336,7 +337,7 @@ public abstract class SqlInterpreterTest {
     assertEquals(Type.TEXT, resultMessages.get(0).getType());
     assertTrue(resultMessages.get(0).getData(), 
resultMessages.get(0).getData().contains("already exists"));
 
-    // show tables
+    // show view
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("show tables", context);
     assertEquals(Code.SUCCESS, result.code());
@@ -344,7 +345,7 @@ public abstract class SqlInterpreterTest {
     assertEquals(Type.TABLE, resultMessages.get(0).getType());
     assertEquals("table\nmy_view\nsource_table\n", 
resultMessages.get(0).getData());
 
-    // drop table
+    // drop view
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("drop view my_view", context);
     assertEquals(Code.SUCCESS, result.code());
@@ -368,6 +369,129 @@ public abstract class SqlInterpreterTest {
             resultMessages.get(0).getData().contains("The following commands 
are available"));
   }
 
+
+  @Test
+  public void testFunction() throws IOException, InterpreterException {
+
+    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+    if(!flinkVersion.isFlink110()){
+      InterpreterContext context = getInterpreterContext();
+
+      // CREATE UDF
+      InterpreterResult result = sqlInterpreter.interpret(
+              "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' 
;", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been created."));
+
+      // SHOW UDF
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret(
+              "SHOW FUNCTIONS ;", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf"));
+
+
+      // ALTER
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret(
+              "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; 
", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been modified."));
+
+
+      // DROP UDF
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function
 has been dropped."));
+
+
+      // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf'
+      result = sqlInterpreter.interpret(
+              "SHOW FUNCTIONS ;", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      assertFalse(resultMessages.toString(), 
resultMessages.get(0).getData().contains("myudf"));
+    } else {
+      // Flink1.10 don't support ddl for function
+      assertTrue(flinkVersion.isFlink110());
+    }
+
+  }
+
+  @Test
+  public void testCatelog() throws IOException, InterpreterException{
+    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+
+    if (!flinkVersion.isFlink110()){
+      InterpreterContext context = getInterpreterContext();
+
+      // CREATE CATALOG
+      InterpreterResult result = sqlInterpreter.interpret(
+              "CREATE CATALOG test_catalog \n" +
+                      "WITH( \n" +
+                      "'type'='generic_in_memory' \n" +
+                      ");", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been created."));
+
+      // USE CATALOG & SHOW DATABASES;
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret(
+              "USE CATALOG test_catalog ;\n" +
+                      "SHOW DATABASES;", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default"));
+
+      // DROP CATALOG
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret(
+              "DROP CATALOG test_catalog ;\n", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog
 has been dropped."));
+
+      // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 
'test_catalog'
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret(
+              "SHOW CATALOGS ;\n", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog"));
+      
assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog"));
+    } else {
+      // Flink1.10 don't support ddl for catalog
+      assertTrue(flinkVersion.isFlink110());
+    }
+
+  }
+
+  @Test
+  public void testShowModules() throws InterpreterException, IOException {
+    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+
+    if (!flinkVersion.isFlink110()) {
+      InterpreterContext context = getInterpreterContext();
+
+      // CREATE CATALOG
+      InterpreterResult result = sqlInterpreter.interpret(
+              "show modules", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+      List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+      assertTrue(resultMessages.toString(), 
resultMessages.get(0).getData().contains("core"));
+    } else {
+      // Flink1.10 don't support show modules
+      assertTrue(flinkVersion.isFlink110());
+    }
+  }
+
+
   protected InterpreterContext getInterpreterContext() {
     InterpreterContext context = InterpreterContext.builder()
             .setParagraphId("paragraphId")

Reply via email to