This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 6dfb97a [ZEPPELIN-4884] Support all sql of flink 1.11 6dfb97a is described below commit 6dfb97a46b6d59676784d4e976251e588a678508 Author: dijie <nj18652727...@gmail.com> AuthorDate: Sat Jul 11 11:12:54 2020 +0800 [ZEPPELIN-4884] Support all sql of flink 1.11 ### What is this PR for? Support all sql of flink 1.11 ### What type of PR is it? [ Feature ] ### Todos * [ ] - Task ### What is the Jira issue? * [ZEPPELIN-4884] https://issues.apache.org/jira/browse/ZEPPELIN-4884 ### How should this be tested? * travis url https://travis-ci.org/github/lonelyGhostisdog/zeppelin * Unit Tests & GUI Tests ### Screenshots (if appropriate) ### Questions: * No. Author: dijie <nj18652727...@gmail.com> Author: Jeff Zhang <zjf...@apache.org> Author: 1721563...@qq.com <a18652727118> Closes #3842 from lonelyGhostisdog/ZEPPELIN-4884 and squashes the following commits: 4dea34e17 [dijie] [ZEPPELIN-4884] fix bug. 962d92885 [dijie] [ZEPPELIN-4884] fix NPE & resolve conflict add55c5a2 [dijie] Merge branch 'ZEPPELIN-4884' of https://github.com/lonelyGhostisdog/zeppelin into ZEPPELIN-4884 3f4074839 [dijie] Merge branch 'master' into ZEPPELIN-4884 dd53e9a0d [dijie] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-4884 54cf5a16a [dijie] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-4884 2b3157f99 [dijie] Merge branch 'ZEPPELIN-4884' of https://github.com/zjffdu/zeppelin into ZEPPELIN-4884 1e37860af [Jeff Zhang] set CatalogTableSchema resolver before running code 193567bff [Jeff Zhang] address NPE ba8ecc501 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11 4ecb231c2 [dijie] [ZEPPELIN-4884] add UT for flink 1.11 0267de4cc [dijie] [ZEPPELIN-4884] fix NPE 372b61f2d [dijie] Merge branch 'ZEPPELIN-4884' of https://github.com/zjffdu/zeppelin into ZEPPELIN-4884 1c7128378 [dijie] Merge branch 'master' of https://github.com/apache/zeppelin into ZEPPELIN-4884 71d237380 [Jeff Zhang] address NPE 9786ea116 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11 49129623c [1721563...@qq.com] [ZEPPELIN-4884] Add unit test for UDF. JavaLower.java & JavaUpper.java are used in UT. 492679915 [1721563...@qq.com] [ZEPPELIN-4884] Due to different Flink version return different comment , so change the assert clause 3b8b91cf0 [1721563...@qq.com] [ZEPPELIN-4884] fix bug. Wrong content e79106716 [1721563...@qq.com] [ZEPPELIN-4884] fix bug. it should be executeSql instead of explainSql 1a8e985fc [1721563...@qq.com] [ZEPPELIN-4884] Update tbenv created timing . In past , it throw NPE in 173 line of FlinkSqlInterrpeter.java. Because when initialized the sqlCommandParser , it's given a null tbenv bfdfac8c2 [1721563...@qq.com] [ZEPPELIN-4884] Add support for 'DESC' fd0ff4300 [Jeff Zhang] [ZEPPELIN-4884]. Support all sql of flink 1.11 --- flink/flink-shims/pom.xml | 18 ++ .../java/org/apache/zeppelin/flink/FlinkShims.java | 26 +++ .../zeppelin/flink/sql/SqlCommandParser.java | 93 +++++---- .../org/apache/zeppelin/flink/Flink110Shims.java | 68 ++++++- .../org/apache/zeppelin/flink/Flink111Shims.java | 224 ++++++++++++++++++++- flink/interpreter/pom.xml | 12 -- .../zeppelin/flink/FlinkBatchSqlInterpreter.java | 4 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 2 + .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 87 ++++---- .../zeppelin/flink/FlinkStreamSqlInterpreter.java | 4 +- .../org/apache/zeppelin/flink/TableEnvFactory.java | 3 +- .../java/org/apache/zeppelin/flink/JavaLower.java | 26 +++ .../java/org/apache/zeppelin/flink/JavaUpper.java | 25 +++ .../apache/zeppelin/flink/SqlInterpreterTest.java | 128 +++++++++++- 14 files changed, 614 insertions(+), 106 deletions(-) diff --git a/flink/flink-shims/pom.xml b/flink/flink-shims/pom.xml index 5ca0568..188077c 100644 --- a/flink/flink-shims/pom.xml +++ b/flink/flink-shims/pom.xml @@ -33,6 +33,24 @@ <packaging>jar</packaging> <name>Zeppelin: Flink Shims</name> + <properties> + <jline.version>3.9.0</jline.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.jline</groupId> + <artifactId>jline-terminal</artifactId> + <version>${jline.version}</version> + </dependency> + + <dependency> + <groupId>org.jline</groupId> + <artifactId>jline-reader</artifactId> + <version>${jline.version}</version> + </dependency> + </dependencies> + <build> <plugins> <plugin> diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 9eb600d..7918aab 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -18,7 +18,11 @@ package org.apache.zeppelin.flink; +import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.net.InetAddress; import java.util.List; +import java.util.Optional; import java.util.Properties; /** @@ -75,6 +80,17 @@ public abstract class FlinkShims { return flinkShims; } + protected static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) { + return new AttributedStringBuilder() + .style(AttributedStyle.DEFAULT.bold()) + .append(cmd.toString()) + .append("\t\t") + .style(AttributedStyle.DEFAULT) + .append(description) + .append('\n') + .toAttributedString(); + } + public abstract Object createCatalogManager(Object config); public abstract String getPyFlinkPythonPath(Properties properties) throws IOException; @@ -105,5 +121,15 @@ public abstract class FlinkShims { public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink); + public abstract Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt); + + public abstract void executeSql(Object tableEnv, String sql); + + public abstract String sqlHelp(); + + public abstract void setCatalogManagerSchemaResolver(Object catalogManager, + Object parser, + Object environmentSetting); + public abstract Object getCustomCli(Object cliFrontend, Object commandLine); } diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java similarity index 85% rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java rename to flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java index 3d3271e..d28d7f6 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java @@ -18,44 +18,31 @@ package org.apache.zeppelin.flink.sql; +import org.apache.zeppelin.flink.FlinkShims; + import java.util.Arrays; import java.util.Objects; import java.util.Optional; import java.util.function.Function; -import java.util.regex.Matcher; import java.util.regex.Pattern; /** * Simple parser for determining the type of command and its parameters. + * All the SqlCommands should be put into this class, and the parsing logic needs to be put ito FlinkShims + * because each version of flink has different sql syntax support. */ public final class SqlCommandParser { - private SqlCommandParser() { - // private - } + private FlinkShims flinkShims; + private Object tableEnv; - public static Optional<SqlCommandCall> parse(String stmt) { - // normalize - stmt = stmt.trim(); - // remove ';' at the end - if (stmt.endsWith(";")) { - stmt = stmt.substring(0, stmt.length() - 1).trim(); - } + public SqlCommandParser(FlinkShims flinkShims, Object tableEnv) { + this.flinkShims = flinkShims; + this.tableEnv = tableEnv; + } - // parse - for (SqlCommand cmd : SqlCommand.values()) { - final Matcher matcher = cmd.pattern.matcher(stmt); - if (matcher.matches()) { - final String[] groups = new String[matcher.groupCount()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = matcher.group(i + 1); - } - final String sql = stmt; - return cmd.operandConverter.apply(groups) - .map((operands) -> new SqlCommandCall(cmd, operands, sql)); - } - } - return Optional.empty(); + public Optional<SqlCommandCall> parse(String stmt) { + return flinkShims.parseSql(tableEnv, stmt); } // -------------------------------------------------------------------------------------------- @@ -112,6 +99,14 @@ public final class SqlCommandParser { "USE\\s+(?!CATALOG)(.*)", SINGLE_OPERAND), + CREATE_CATALOG(null, SINGLE_OPERAND), + + DROP_CATALOG(null, SINGLE_OPERAND), + + DESC( + "DESC\\s+(.*)", + SINGLE_OPERAND), + DESCRIBE( "DESCRIBE\\s+(.*)", SINGLE_OPERAND), @@ -120,22 +115,30 @@ public final class SqlCommandParser { "EXPLAIN\\s+(.*)", SINGLE_OPERAND), - SELECT( - "(SELECT.*)", + CREATE_DATABASE( + "(CREATE\\s+DATABASE\\s+.*)", SINGLE_OPERAND), - INSERT_INTO( - "(INSERT\\s+INTO.*)", + DROP_DATABASE( + "(DROP\\s+DATABASE\\s+.*)", SINGLE_OPERAND), - INSERT_OVERWRITE( - "(INSERT\\s+OVERWRITE.*)", + ALTER_DATABASE( + "(ALTER\\s+DATABASE\\s+.*)", SINGLE_OPERAND), CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND), DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND), + ALTER_TABLE( + "(ALTER\\s+TABLE\\s+.*)", + SINGLE_OPERAND), + + DROP_VIEW( + "DROP\\s+VIEW\\s+(.*)", + SINGLE_OPERAND), + CREATE_VIEW( "CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)", (operands) -> { @@ -145,24 +148,22 @@ public final class SqlCommandParser { return Optional.of(new String[]{operands[0], operands[1]}); }), - CREATE_DATABASE( - "(CREATE\\s+DATABASE\\s+.*)", - SINGLE_OPERAND), + CREATE_FUNCTION(null, SINGLE_OPERAND), - DROP_DATABASE( - "(DROP\\s+DATABASE\\s+.*)", - SINGLE_OPERAND), + DROP_FUNCTION(null, SINGLE_OPERAND), - DROP_VIEW( - "DROP\\s+VIEW\\s+(.*)", + ALTER_FUNCTION(null, SINGLE_OPERAND), + + SELECT( + "(SELECT.*)", SINGLE_OPERAND), - ALTER_DATABASE( - "(ALTER\\s+DATABASE\\s+.*)", + INSERT_INTO( + "(INSERT\\s+INTO.*)", SINGLE_OPERAND), - ALTER_TABLE( - "(ALTER\\s+TABLE\\s+.*)", + INSERT_OVERWRITE( + "(INSERT\\s+OVERWRITE.*)", SINGLE_OPERAND), SET( @@ -188,7 +189,11 @@ public final class SqlCommandParser { public final Function<String[], Optional<String[]>> operandConverter; SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) { - this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS); + if (matchingRegex == null) { + this.pattern = null; + } else { + this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS); + } this.operandConverter = operandConverter; } diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index 1f7dd55..4479b6b 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -38,7 +38,11 @@ import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; import org.apache.zeppelin.flink.shims111.Flink110ScalaShims; +import org.apache.zeppelin.flink.sql.SqlCommandParser; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +51,9 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.file.Files; import java.util.List; +import java.util.Optional; import java.util.Properties; +import java.util.regex.Matcher; /** @@ -56,6 +62,29 @@ import java.util.Properties; public class Flink110Shims extends FlinkShims { private static final Logger LOGGER = LoggerFactory.getLogger(Flink110Shims.class); + public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) + .append(formatCommand(SqlCommandParser.SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) + .append(formatCommand(SqlCommandParser.SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) + .append(formatCommand(SqlCommandParser.SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) + .append(formatCommand(SqlCommandParser.SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) + .append(formatCommand(SqlCommandParser.SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) + .append(formatCommand(SqlCommandParser.SqlCommand.HELP, "Prints the available commands.")) + .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) + .append(formatCommand(SqlCommandParser.SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) + .append(formatCommand(SqlCommandParser.SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) + .append(formatCommand(SqlCommandParser.SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) + .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) + .append(formatCommand(SqlCommandParser.SqlCommand.SHOW_TABLES, "Shows all registered tables.")) + .append(formatCommand(SqlCommandParser.SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) + .append(formatCommand(SqlCommandParser.SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) + .append(formatCommand(SqlCommandParser.SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") + .toAttributedString(); public Flink110Shims(Properties properties) { super(properties); @@ -67,7 +96,6 @@ public class Flink110Shims extends FlinkShims { new GenericInMemoryCatalog("default_catalog", "default_database")); } - @Override public String getPyFlinkPythonPath(Properties properties) throws IOException { String flinkHome = System.getenv("FLINK_HOME"); @@ -151,6 +179,44 @@ public class Flink110Shims extends FlinkShims { } @Override + public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) { + // parse + for (SqlCommandParser.SqlCommand cmd : SqlCommandParser.SqlCommand.values()) { + if (cmd.pattern == null){ + continue; + } + final Matcher matcher = cmd.pattern.matcher(stmt); + if (matcher.matches()) { + final String[] groups = new String[matcher.groupCount()]; + for (int i = 0; i < groups.length; i++) { + groups[i] = matcher.group(i + 1); + } + final String sql = stmt; + return cmd.operandConverter.apply(groups) + .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, operands, sql)); + } + } + return Optional.empty(); + } + + @Override + public void executeSql(Object tableEnv, String sql) { + throw new RuntimeException("Should not be called for flink 1.10"); + } + + @Override + public String sqlHelp() { + return MESSAGE_HELP.toString(); + } + + @Override + public void setCatalogManagerSchemaResolver(Object catalogManager, + Object parser, + Object environmentSetting) { + // do nothing for flink 1.10 + } + + @Override public Object getCustomCli(Object cliFrontend, Object commandLine) { return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) commandLine); } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index f6d929a..d1b89fd 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -27,22 +27,58 @@ import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.DescribeTableOperation; +import org.apache.flink.table.operations.ExplainOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowDatabasesOperation; +import org.apache.flink.table.operations.ShowFunctionsOperation; +import org.apache.flink.table.operations.ShowTablesOperation; +import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterTableOperation; +import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; +import org.apache.flink.table.operations.ddl.CreateCatalogOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; +import org.apache.flink.table.operations.ddl.CreateViewOperation; +import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; +import org.apache.flink.table.operations.ddl.DropCatalogOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; +import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; +import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.zeppelin.flink.shims111.CollectStreamTableSink; import org.apache.zeppelin.flink.shims111.Flink111ScalaShims; +import org.apache.zeppelin.flink.sql.SqlCommandParser; +import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand; +import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +88,10 @@ import java.net.InetAddress; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; /** @@ -62,6 +100,29 @@ import java.util.concurrent.ConcurrentHashMap; public class Flink111Shims extends FlinkShims { private static final Logger LOGGER = LoggerFactory.getLogger(Flink111Shims.class); + public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) + .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) + .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) + .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) + .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) + .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) + .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) + .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) + .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) + .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) + .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) + .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) + .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) + .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) + .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) + .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") + .toAttributedString(); private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>(); @@ -120,7 +181,7 @@ public class Flink111Shims extends FlinkShims { @Override public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception { JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get(); - while(!jobClient.getJobStatus().get().isTerminalState()) { + while (!jobClient.getJobStatus().get().isTerminalState()) { LOGGER.debug("Wait for job to finish"); Thread.sleep(1000 * 5); } @@ -158,17 +219,172 @@ public class Flink111Shims extends FlinkShims { @Override public void registerTableFunction(Object btenv, String name, Object tableFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableFunction) tableFunction); + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); } @Override public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); } @Override public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { - ((StreamTableEnvironmentImpl)(btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + } + + /** + * Parse it via flink SqlParser first, then fallback to regular expression matching. + * + * @param tableEnv + * @param stmt + * @return + */ + @Override + public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) { + Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser(); + SqlCommandCall sqlCommandCall = null; + try { + // parse statement via regex matching first + Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt); + if (callOpt.isPresent()) { + sqlCommandCall = callOpt.get(); + } else { + sqlCommandCall = parseBySqlParser(sqlParser, stmt); + } + } catch (Exception e) { + return Optional.empty(); + } + return Optional.of(sqlCommandCall); + + } + + private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception { + List<Operation> operations; + try { + operations = sqlParser.parse(stmt); + } catch (Throwable e) { + throw new Exception("Invalidate SQL statement.", e); + } + if (operations.size() != 1) { + throw new Exception("Only single statement is supported now."); + } + + final SqlCommand cmd; + String[] operands = new String[]{stmt}; + Operation operation = operations.get(0); + if (operation instanceof CatalogSinkModifyOperation) { + boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); + cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; + } else if (operation instanceof CreateTableOperation) { + cmd = SqlCommand.CREATE_TABLE; + } else if (operation instanceof DropTableOperation) { + cmd = SqlCommand.DROP_TABLE; + } else if (operation instanceof AlterTableOperation) { + cmd = SqlCommand.ALTER_TABLE; + } else if (operation instanceof CreateViewOperation) { + cmd = SqlCommand.CREATE_VIEW; + } else if (operation instanceof DropViewOperation) { + cmd = SqlCommand.DROP_VIEW; + } else if (operation instanceof CreateDatabaseOperation) { + cmd = SqlCommand.CREATE_DATABASE; + } else if (operation instanceof DropDatabaseOperation) { + cmd = SqlCommand.DROP_DATABASE; + } else if (operation instanceof AlterDatabaseOperation) { + cmd = SqlCommand.ALTER_DATABASE; + } else if (operation instanceof CreateCatalogOperation) { + cmd = SqlCommand.CREATE_CATALOG; + } else if (operation instanceof DropCatalogOperation) { + cmd = SqlCommand.DROP_CATALOG; + } else if (operation instanceof UseCatalogOperation) { + cmd = SqlCommand.USE_CATALOG; + operands = new String[]{((UseCatalogOperation) operation).getCatalogName()}; + } else if (operation instanceof UseDatabaseOperation) { + cmd = SqlCommand.USE; + operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()}; + } else if (operation instanceof ShowCatalogsOperation) { + cmd = SqlCommand.SHOW_CATALOGS; + operands = new String[0]; + } else if (operation instanceof ShowDatabasesOperation) { + cmd = SqlCommand.SHOW_DATABASES; + operands = new String[0]; + } else if (operation instanceof ShowTablesOperation) { + cmd = SqlCommand.SHOW_TABLES; + operands = new String[0]; + } else if (operation instanceof ShowFunctionsOperation) { + cmd = SqlCommand.SHOW_FUNCTIONS; + operands = new String[0]; + } else if (operation instanceof CreateCatalogFunctionOperation || + operation instanceof CreateTempSystemFunctionOperation) { + cmd = SqlCommand.CREATE_FUNCTION; + } else if (operation instanceof DropCatalogFunctionOperation || + operation instanceof DropTempSystemFunctionOperation) { + cmd = SqlCommand.DROP_FUNCTION; + } else if (operation instanceof AlterCatalogFunctionOperation) { + cmd = SqlCommand.ALTER_FUNCTION; + } else if (operation instanceof ExplainOperation) { + cmd = SqlCommand.EXPLAIN; + } else if (operation instanceof DescribeTableOperation) { + cmd = SqlCommand.DESCRIBE; + operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()}; + } else if (operation instanceof QueryOperation) { + cmd = SqlCommand.SELECT; + } else { + throw new Exception("Unknown operation: " + operation.asSummaryString()); + } + + return new SqlCommandCall(cmd, operands, stmt); + } + + private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) { + // parse statement via regex matching + for (SqlCommand cmd : SqlCommand.values()) { + if (cmd.pattern != null) { + final Matcher matcher = cmd.pattern.matcher(stmt); + if (matcher.matches()) { + final String[] groups = new String[matcher.groupCount()]; + for (int i = 0; i < groups.length; i++) { + groups[i] = matcher.group(i + 1); + } + return cmd.operandConverter.apply(groups) + .map((operands) -> { + String[] newOperands = operands; + if (cmd == SqlCommand.EXPLAIN) { + // convert `explain xx` to `explain plan for xx` + // which can execute through executeSql method + newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]}; + } + return new SqlCommandCall(cmd, newOperands, stmt); + }); + } + } + } + return Optional.empty(); + } + + @Override + public void executeSql(Object tableEnv, String sql) { + ((TableEnvironment) tableEnv).executeSql(sql); + } + + @Override + public String sqlHelp() { + return MESSAGE_HELP.toString(); + } + + /** + * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. + * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. + * @param catalogManager + * @param parserObject + * @param environmentSetting + */ + @Override + public void setCatalogManagerSchemaResolver(Object catalogManager, + Object parserObject, + Object environmentSetting) { + ((CatalogManager) catalogManager).setCatalogTableSchemaResolver( + new CatalogTableSchemaResolver((Parser)parserObject, + ((EnvironmentSettings)environmentSetting).isStreamingMode())); } @Override diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml index 5d6d41b..83e2c01 100644 --- a/flink/interpreter/pom.xml +++ b/flink/interpreter/pom.xml @@ -145,18 +145,6 @@ </dependency> <dependency> - <groupId>org.jline</groupId> - <artifactId>jline-terminal</artifactId> - <version>3.9.0</version> - </dependency> - - <dependency> - <groupId>org.jline</groupId> - <artifactId>jline-reader</artifactId> - <version>3.9.0</version> - </dependency> - - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-python_${scala.binary.version}</artifactId> <version>${flink.version}</version> diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java index f397187..f48dcb5 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java @@ -41,9 +41,11 @@ public class FlinkBatchSqlInterpreter extends FlinkSqlInterrpeter { @Override public void open() throws InterpreterException { - super.open(); + this.flinkInterpreter = + getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); this.tbenv = flinkInterpreter.getJavaBatchTableEnvironment("blink"); this.z = flinkInterpreter.getZeppelinContext(); + super.open(); } @Override diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 244559f..d624c43 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -21,6 +21,8 @@ import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.delegation.Planner; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index f2d31b6..659bf09 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -61,32 +61,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterrpeter.class); - public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder() - .append("The following commands are available:\n\n") - .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database.")) - .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'")) - .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'")) - .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name.")) - .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'")) - .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name.")) - .append(formatCommand(SqlCommand.HELP, "Prints the available commands.")) - .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink.")) - .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")) - .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster.")) - .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.")) - .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions.")) - .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables.")) - .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster.")) - .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'")) - .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'")) - .style(AttributedStyle.DEFAULT.underline()) - .append("\nHint") - .style(AttributedStyle.DEFAULT) - .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.") - .toAttributedString(); - protected FlinkInterpreter flinkInterpreter; protected TableEnvironment tbenv; + private SqlCommandParser sqlCommandParser; private SqlSplitter sqlSplitter; private int defaultSqlParallelism; private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); @@ -105,8 +82,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { @Override public void open() throws InterpreterException { - flinkInterpreter = - getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); + sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), tbenv); this.sqlSplitter = new SqlSplitter(); JobListener jobListener = new JobListener() { @Override @@ -186,11 +162,11 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { List<String> sqls = sqlSplitter.splitSql(st); boolean isFirstInsert = true; for (String sql : sqls) { - Optional<SqlCommandParser.SqlCommandCall> sqlCommand = SqlCommandParser.parse(sql); + Optional<SqlCommandParser.SqlCommandCall> sqlCommand = sqlCommandParser.parse(sql); if (!sqlCommand.isPresent()) { try { context.out.write("%text Invalid Sql statement: " + sql + "\n"); - context.out.write(MESSAGE_HELP.toString()); + context.out.write(flinkInterpreter.getFlinkShims().sqlHelp()); } catch (IOException e) { return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); } @@ -275,6 +251,15 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { case SOURCE: callSource(cmdCall.operands[0], context); break; + case CREATE_FUNCTION: + callCreateFunction(cmdCall.operands[0], context); + break; + case DROP_FUNCTION: + callDropFunction(cmdCall.operands[0], context); + break; + case ALTER_FUNCTION: + callAlterFunction(cmdCall.operands[0], context); + break; case SHOW_FUNCTIONS: callShowFunctions(context); break; @@ -287,6 +272,13 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { case USE: callUseDatabase(cmdCall.operands[0], context); break; + case CREATE_CATALOG: + callCreateCatalog(cmdCall.operands[0], context); + break; + case DROP_CATALOG: + callDropCatalog(cmdCall.operands[0], context); + break; + case DESC: case DESCRIBE: callDescribe(cmdCall.operands[0], context); break; @@ -419,17 +411,27 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { context.out.write("Table has been dropped.\n"); } - private void callUseCatalog(String catalog, InterpreterContext context) { + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { this.tbenv.useCatalog(catalog); } + private void callCreateCatalog(String sql, InterpreterContext context) throws IOException { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sql); + context.out.write("Catalog has been created.\n"); + } + + private void callDropCatalog(String sql, InterpreterContext context) throws IOException { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sql); + context.out.write("Catalog has been dropped.\n"); + } + private void callShowModules(InterpreterContext context) throws IOException { String[] modules = this.tbenv.listModules(); context.out.write("%table module\n" + StringUtils.join(modules, "\n") + "\n"); } private void callHelp(InterpreterContext context) throws IOException { - context.out.write(MESSAGE_HELP.toString()); + context.out.write(flinkInterpreter.getFlinkShims().sqlHelp()); } private void callShowCatalogs(InterpreterContext context) throws IOException { @@ -456,6 +458,21 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { runSqlList(sql, context); } + private void callCreateFunction(String sql, InterpreterContext context) throws IOException { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sql); + context.out.write("Function has been created.\n"); + } + + private void callDropFunction(String sql, InterpreterContext context) throws IOException { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sql); + context.out.write("Function has been dropped.\n"); + } + + private void callAlterFunction(String sql, InterpreterContext context) throws IOException { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sql); + context.out.write("Function has been modified.\n"); + } + private void callShowFunctions(InterpreterContext context) throws IOException { String[] functions = this.tbenv.listUserDefinedFunctions(); context.out.write( @@ -553,14 +570,4 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { this.flinkInterpreter.cancel(context); } - private static AttributedString formatCommand(SqlCommand cmd, String description) { - return new AttributedStringBuilder() - .style(AttributedStyle.DEFAULT.bold()) - .append(cmd.toString()) - .append("\t\t") - .style(AttributedStyle.DEFAULT) - .append(description) - .append('\n') - .toAttributedString(); - } } diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java index ab05526..c027341 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java @@ -43,8 +43,10 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter { @Override public void open() throws InterpreterException { - super.open(); + this.flinkInterpreter = + getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class); this.tbenv = flinkInterpreter.getJavaStreamTableEnvironment("blink"); + super.open(); } @Override diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 75d06bb..b514f49 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -354,13 +354,14 @@ public class TableEnvFactory { Executor executor = lookupExecutor(executorProperties, senv.getJavaEnv()); Map<String, String> plannerProperties = settings.toPlannerProperties(); - ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) .create( plannerProperties, executor, tblConfig, blinkFunctionCatalog, catalogManager); + this.flinkShims.setCatalogManagerSchemaResolver(catalogManager, planner.getParser(), settings); } private static Executor lookupExecutor( diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java new file mode 100644 index 0000000..483910e --- /dev/null +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaLower.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.flink.table.functions.ScalarFunction; + +public class JavaLower extends ScalarFunction { + public String eval(String str) { + return str.toLowerCase(); + } +} diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java new file mode 100644 index 0000000..d444084 --- /dev/null +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/JavaUpper.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import org.apache.flink.table.functions.ScalarFunction; + +public class JavaUpper extends ScalarFunction { + public String eval(String str) { + return str.toUpperCase(); + } +} diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java index 94a9edc..f84564a 100644 --- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java @@ -64,6 +64,7 @@ import static org.apache.zeppelin.interpreter.InterpreterResult.Code; import static org.apache.zeppelin.interpreter.InterpreterResult.Type; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; @@ -336,7 +337,7 @@ public abstract class SqlInterpreterTest { assertEquals(Type.TEXT, resultMessages.get(0).getType()); assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("already exists")); - // show tables + // show view context = getInterpreterContext(); result = sqlInterpreter.interpret("show tables", context); assertEquals(Code.SUCCESS, result.code()); @@ -344,7 +345,7 @@ public abstract class SqlInterpreterTest { assertEquals(Type.TABLE, resultMessages.get(0).getType()); assertEquals("table\nmy_view\nsource_table\n", resultMessages.get(0).getData()); - // drop table + // drop view context = getInterpreterContext(); result = sqlInterpreter.interpret("drop view my_view", context); assertEquals(Code.SUCCESS, result.code()); @@ -368,6 +369,129 @@ public abstract class SqlInterpreterTest { resultMessages.get(0).getData().contains("The following commands are available")); } + + @Test + public void testFunction() throws IOException, InterpreterException { + + FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + if(!flinkVersion.isFlink110()){ + InterpreterContext context = getInterpreterContext(); + + // CREATE UDF + InterpreterResult result = sqlInterpreter.interpret( + "CREATE FUNCTION myudf AS 'org.apache.zeppelin.flink.JavaUpper' ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been created.")); + + // SHOW UDF + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "SHOW FUNCTIONS ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf")); + + + // ALTER + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; ", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been modified.")); + + + // DROP UDF + context = getInterpreterContext(); + result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been dropped.")); + + + // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf' + result = sqlInterpreter.interpret( + "SHOW FUNCTIONS ;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertFalse(resultMessages.toString(), resultMessages.get(0).getData().contains("myudf")); + } else { + // Flink1.10 don't support ddl for function + assertTrue(flinkVersion.isFlink110()); + } + + } + + @Test + public void testCatelog() throws IOException, InterpreterException{ + FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + + if (!flinkVersion.isFlink110()){ + InterpreterContext context = getInterpreterContext(); + + // CREATE CATALOG + InterpreterResult result = sqlInterpreter.interpret( + "CREATE CATALOG test_catalog \n" + + "WITH( \n" + + "'type'='generic_in_memory' \n" + + ");", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been created.")); + + // USE CATALOG & SHOW DATABASES; + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "USE CATALOG test_catalog ;\n" + + "SHOW DATABASES;", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default")); + + // DROP CATALOG + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "DROP CATALOG test_catalog ;\n", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been dropped.")); + + // SHOW CATALOG. Due to drop CATALOG before, it shouldn't contain 'test_catalog' + context = getInterpreterContext(); + result = sqlInterpreter.interpret( + "SHOW CATALOGS ;\n", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog")); + assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog")); + } else { + // Flink1.10 don't support ddl for catalog + assertTrue(flinkVersion.isFlink110()); + } + + } + + @Test + public void testShowModules() throws InterpreterException, IOException { + FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + + if (!flinkVersion.isFlink110()) { + InterpreterContext context = getInterpreterContext(); + + // CREATE CATALOG + InterpreterResult result = sqlInterpreter.interpret( + "show modules", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); + assertTrue(resultMessages.toString(), resultMessages.get(0).getData().contains("core")); + } else { + // Flink1.10 don't support show modules + assertTrue(flinkVersion.isFlink110()); + } + } + + protected InterpreterContext getInterpreterContext() { InterpreterContext context = InterpreterContext.builder() .setParagraphId("paragraphId")