This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 499969c  [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't work in 
flink interpreter
499969c is described below

commit 499969cdd05ebf7533c7fc340147312c0212a61e
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Jul 15 13:59:48 2020 +0800

    [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't work in flink 
interpreter
    
    ### What is this PR for?
    flink 1.11 introduce new configurations, so we should move it to FlinkShims 
as well.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4956
    
    ### How should this be tested?
    * CI pass
    https://travis-ci.org/github/zjffdu/zeppelin/builds/708230941
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3857 from zjffdu/ZEPPELIN-4956 and squashes the following commits:
    
    5e60c2989 [Jeff Zhang] [ZEPPELIN-4956]. set table.sql-dialect=hive doesn't 
work in flink interpreter
    
    (cherry picked from commit e131b691c94e60a574e8a54b99cb7c2522cae8c1)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  3 ++
 .../org/apache/zeppelin/flink/Flink110Shims.java   | 34 +++++++++++++++++++++-
 .../org/apache/zeppelin/flink/Flink111Shims.java   | 33 +++++++++++++++++++++
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 26 +----------------
 .../apache/zeppelin/flink/SqlInterpreterTest.java  | 21 ++++++++++++-
 5 files changed, 90 insertions(+), 27 deletions(-)

diff --git 
a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java 
b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 7918aab..55c3b00 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -132,4 +133,6 @@ public abstract class FlinkShims {
                                                        Object 
environmentSetting);
 
   public abstract Object getCustomCli(Object cliFrontend, Object commandLine);
+
+  public abstract Map extractTableConfigOptions();
 }
diff --git 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 4479b6b..39ab6e0 100644
--- 
a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ 
b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -23,10 +23,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.python.PythonOptions;
 import org.apache.flink.python.util.ResourceUtil;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.scala.BatchTableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -48,9 +52,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.nio.file.Files;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.regex.Matcher;
@@ -215,9 +222,34 @@ public class Flink110Shims extends FlinkShims {
                                               Object environmentSetting) {
     // do nothing for flink 1.10
   }
-  
+
   @Override
   public Object getCustomCli(Object cliFrontend, Object commandLine) {
     return ((CliFrontend)cliFrontend).getActiveCustomCommandLine((CommandLine) 
commandLine);
   }
+
+  @Override
+  public Map extractTableConfigOptions() {
+    Map<String, ConfigOption> configOptions = new HashMap<>();
+    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    return configOptions;
+  }
+
+  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
+    Map<String, ConfigOption> configOptions = new HashMap();
+    Field[] fields = clazz.getDeclaredFields();
+    for (Field field : fields) {
+      if (field.getType().isAssignableFrom(ConfigOption.class)) {
+        try {
+          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
+          configOptions.put(configOption.key(), configOption);
+        } catch (Throwable e) {
+          LOGGER.warn("Fail to get ConfigOption", e);
+        }
+      }
+    }
+    return configOptions;
+  }
 }
diff --git 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index d1b89fd..2b9185b 100644
--- 
a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ 
b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -25,14 +25,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.python.PythonOptions;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -84,8 +89,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -391,4 +398,30 @@ public class Flink111Shims extends FlinkShims {
   public Object getCustomCli(Object cliFrontend, Object commandLine) {
     return 
((CliFrontend)cliFrontend).validateAndGetActiveCommandLine((CommandLine) 
commandLine);
   }
+
+  @Override
+  public Map extractTableConfigOptions() {
+    Map<String, ConfigOption> configOptions = new HashMap<>();
+    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
+    return configOptions;
+  }
+
+  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
+    Map<String, ConfigOption> configOptions = new HashMap();
+    Field[] fields = clazz.getDeclaredFields();
+    for (Field field : fields) {
+      if (field.getType().isAssignableFrom(ConfigOption.class)) {
+        try {
+          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
+          configOptions.put(configOption.key(), configOption);
+        } catch (Throwable e) {
+          LOGGER.warn("Fail to get ConfigOption", e);
+        }
+      }
+    }
+    return configOptions;
+  }
 }
diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 659bf09..3214ca5 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -102,31 +102,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     
flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
-    this.tableConfigOptions = extractTableConfigOptions();
-  }
-
-  private Map<String, ConfigOption> extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
+    this.tableConfigOptions = 
flinkInterpreter.getFlinkShims().extractTableConfigOptions();
   }
 
   @Override
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index f84564a..ffe2fce 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -424,7 +424,7 @@ public abstract class SqlInterpreterTest {
   }
 
   @Test
-  public void testCatelog() throws IOException, InterpreterException{
+  public void testCatalog() throws IOException, InterpreterException{
     FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
 
     if (!flinkVersion.isFlink110()){
@@ -473,6 +473,25 @@ public abstract class SqlInterpreterTest {
   }
 
   @Test
+  public void testSetProperty() throws InterpreterException {
+    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+
+    if (!flinkVersion.isFlink110()){
+      InterpreterContext context = getInterpreterContext();
+      InterpreterResult result = sqlInterpreter.interpret(
+              "set table.sql-dialect=hive", context);
+      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, 
result.code());
+
+    } else {
+      // Flink1.10 doesn't support set table.sql-dialet which is introduced in 
flink 1.11
+      InterpreterContext context = getInterpreterContext();
+      InterpreterResult result = sqlInterpreter.interpret(
+              "set table.sql-dialect=hive", context);
+      assertEquals(context.out.toString(), Code.ERROR, result.code());
+    }
+  }
+
+  @Test
   public void testShowModules() throws InterpreterException, IOException {
     FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
 

Reply via email to