This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 499969c [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't work in flink interpreter 499969c is described below commit 499969cdd05ebf7533c7fc340147312c0212a61e Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Jul 15 13:59:48 2020 +0800 [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't work in flink interpreter ### What is this PR for? flink 1.11 introduce new configurations, so we should move it to FlinkShims as well. ### What type of PR is it? [Bug Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4956 ### How should this be tested? * CI pass https://travis-ci.org/github/zjffdu/zeppelin/builds/708230941 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3857 from zjffdu/ZEPPELIN-4956 and squashes the following commits: 5e60c2989 [Jeff Zhang] [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't work in flink interpreter (cherry picked from commit e131b691c94e60a574e8a54b99cb7c2522cae8c1) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../java/org/apache/zeppelin/flink/FlinkShims.java | 3 ++ .../org/apache/zeppelin/flink/Flink110Shims.java | 34 +++++++++++++++++++++- .../org/apache/zeppelin/flink/Flink111Shims.java | 33 +++++++++++++++++++++ .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 26 +---------------- .../apache/zeppelin/flink/SqlInterpreterTest.java | 21 ++++++++++++- 5 files changed, 90 insertions(+), 27 deletions(-) diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 7918aab..55c3b00 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.net.InetAddress; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -132,4 +133,6 @@ public abstract class FlinkShims { Object environmentSetting); public abstract Object getCustomCli(Object cliFrontend, Object commandLine); + + public abstract Map extractTableConfigOptions(); } diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java index 4479b6b..39ab6e0 100644 --- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java +++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java @@ -23,10 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.python.PythonOptions; import org.apache.flink.python.util.ResourceUtil; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableUtils; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.scala.BatchTableEnvironment; import org.apache.flink.table.catalog.CatalogManager; @@ -48,9 +52,12 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.nio.file.Files; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.regex.Matcher; @@ -215,9 +222,34 @@ public class Flink110Shims extends FlinkShims { Object environmentSetting) { // do nothing for flink 1.10 } - + @Override public Object getCustomCli(Object cliFrontend, Object commandLine) { return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) commandLine); } + + @Override + public Map extractTableConfigOptions() { + Map<String, ConfigOption> configOptions = new HashMap<>(); + configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class)); + configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class)); + configOptions.putAll(extractConfigOptions(PythonOptions.class)); + return configOptions; + } + + private Map<String, ConfigOption> extractConfigOptions(Class clazz) { + Map<String, ConfigOption> configOptions = new HashMap(); + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.getType().isAssignableFrom(ConfigOption.class)) { + try { + ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class); + configOptions.put(configOption.key(), configOption); + } catch (Throwable e) { + LOGGER.warn("Fail to get ConfigOption", e); + } + } + } + return configOptions; + } } diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java index d1b89fd..2b9185b 100644 --- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java +++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java @@ -25,14 +25,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.scala.DataSet; import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.python.PythonOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.CatalogTableSchemaResolver; import org.apache.flink.table.catalog.CatalogManager; @@ -84,8 +89,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -391,4 +398,30 @@ public class Flink111Shims extends FlinkShims { public Object getCustomCli(Object cliFrontend, Object commandLine) { return ((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); } + + @Override + public Map extractTableConfigOptions() { + Map<String, ConfigOption> configOptions = new HashMap<>(); + configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class)); + configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class)); + configOptions.putAll(extractConfigOptions(PythonOptions.class)); + configOptions.putAll(extractConfigOptions(TableConfigOptions.class)); + return configOptions; + } + + private Map<String, ConfigOption> extractConfigOptions(Class clazz) { + Map<String, ConfigOption> configOptions = new HashMap(); + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + if (field.getType().isAssignableFrom(ConfigOption.class)) { + try { + ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class); + configOptions.put(configOption.key(), configOption); + } catch (Throwable e) { + LOGGER.warn("Fail to get ConfigOption", e); + } + } + } + return configOptions; + } } diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index 659bf09..3214ca5 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -102,31 +102,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter { flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener); flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener); this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism(); - this.tableConfigOptions = extractTableConfigOptions(); - } - - private Map<String, ConfigOption> extractTableConfigOptions() { - Map<String, ConfigOption> configOptions = new HashMap<>(); - configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class)); - configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class)); - configOptions.putAll(extractConfigOptions(PythonOptions.class)); - return configOptions; - } - - private Map<String, ConfigOption> extractConfigOptions(Class clazz) { - Map<String, ConfigOption> configOptions = new HashMap(); - Field[] fields = clazz.getDeclaredFields(); - for (Field field : fields) { - if (field.getType().isAssignableFrom(ConfigOption.class)) { - try { - ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class); - configOptions.put(configOption.key(), configOption); - } catch (Throwable e) { - LOGGER.warn("Fail to get ConfigOption", e); - } - } - } - return configOptions; + this.tableConfigOptions = flinkInterpreter.getFlinkShims().extractTableConfigOptions(); } @Override diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java index f84564a..ffe2fce 100644 --- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java @@ -424,7 +424,7 @@ public abstract class SqlInterpreterTest { } @Test - public void testCatelog() throws IOException, InterpreterException{ + public void testCatalog() throws IOException, InterpreterException{ FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); if (!flinkVersion.isFlink110()){ @@ -473,6 +473,25 @@ public abstract class SqlInterpreterTest { } @Test + public void testSetProperty() throws InterpreterException { + FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion(); + + if (!flinkVersion.isFlink110()){ + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = sqlInterpreter.interpret( + "set table.sql-dialect=hive", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + + } else { + // Flink1.10 doesn't support set table.sql-dialet which is introduced in flink 1.11 + InterpreterContext context = getInterpreterContext(); + InterpreterResult result = sqlInterpreter.interpret( + "set table.sql-dialect=hive", context); + assertEquals(context.out.toString(), Code.ERROR, result.code()); + } + } + + @Test public void testShowModules() throws InterpreterException, IOException { FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();