This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new dc45a89  [ZEPPELIN-4688]. Support set statement in flink sql
dc45a89 is described below

commit dc45a899dce0a7307663ae5ade51325763dc7f00
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Mar 19 14:28:55 2020 +0800

    [ZEPPELIN-4688]. Support set statement in flink sql
    
    ### What is this PR for?
    This PR is to add support for set statement in flink sql.  The properties 
in the set statement only affect the current paragraph, we would reset the 
properties after the paragraph is finished
    
    ### What type of PR is it?
    [ Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4688
    
    ### How should this be tested?
    * CI Pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3693 from zjffdu/ZEPPELIN-4688 and squashes the following commits:
    
    790b241db [Jeff Zhang] [ZEPPELIN-4688]. Support set statement in flink sql
---
 flink/pom.xml                                      |  4 +
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java | 87 ++++++++++++++++++++--
 .../flink/FlinkBatchSqlInterpreterTest.java        | 59 +++++++++++++++
 3 files changed, 144 insertions(+), 6 deletions(-)

diff --git a/flink/pom.xml b/flink/pom.xml
index 9be4fe5..21ec182 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -492,6 +492,10 @@
           <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java 
b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index d794b4b..82ac50e 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -22,12 +22,16 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.python.PythonConfig;
+import org.apache.flink.python.PythonOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
 import org.apache.zeppelin.interpreter.Interpreter;
@@ -42,15 +46,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public abstract class FlinkSqlInterrpeter extends Interpreter {
@@ -69,6 +72,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter 
{
           .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results 
of a SQL SELECT query into a declared table sink."))
           .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the 
results of a SQL SELECT query into a declared table sink and overwrite existing 
data."))
           .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT 
query on the Flink cluster."))
+          .append(formatCommand(SqlCommand.SET, "Sets a session configuration 
property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all 
properties."))
           .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all 
user-defined and built-in functions."))
           .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered 
tables."))
           .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query 
from a file and executes it on the Flink cluster."))
@@ -86,7 +90,11 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
   private SqlSplitter sqlSplitter;
   private int defaultSqlParallelism;
   private ReentrantReadWriteLock.WriteLock lock = new 
ReentrantReadWriteLock().writeLock();
-
+  // all the available sql config options. see
+  // 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
+  private Map<String, ConfigOption> tableConfigOptions;
+  // represent the current paragraph's configOptions
+  private Map<String, String> currentConfigOptions = new HashMap<>();
 
   public FlinkSqlInterrpeter(Properties properties) {
     super(properties);
@@ -117,6 +125,31 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
     
flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     
flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
     this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
+    this.tableConfigOptions = extractTableConfigOptions();
+  }
+
+  private Map<String, ConfigOption> extractTableConfigOptions() {
+    Map<String, ConfigOption> configOptions = new HashMap<>();
+    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
+    configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    return configOptions;
+  }
+
+  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
+    Map<String, ConfigOption> configOptions = new HashMap();
+    Field[] fields = clazz.getDeclaredFields();
+    for (Field field : fields) {
+      if (field.getType().isAssignableFrom(ConfigOption.class)) {
+        try {
+          ConfigOption configOption = (ConfigOption) 
field.get(ConfigOption.class);
+          configOptions.put(configOption.key(), configOption);
+        } catch (Throwable e) {
+          LOGGER.warn("Fail to get ConfigOption", e);
+        }
+      }
+    }
+    return configOptions;
   }
 
   @Override
@@ -139,6 +172,7 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
   }
 
   private InterpreterResult runSqlList(String st, InterpreterContext context) {
+    currentConfigOptions.clear();
     List<String> sqls = sqlSplitter.splitSql(st);
     for (String sql : sqls) {
       Optional<SqlCommandParser.SqlCommandCall> sqlCommand = 
SqlCommandParser.parse(sql);
@@ -210,6 +244,9 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
       case SELECT:
         callSelect(cmdCall.operands[0], context);
         break;
+      case SET:
+        callSet(cmdCall.operands[0], cmdCall.operands[1], context);
+        break;
       case INSERT_INTO:
       case INSERT_OVERWRITE:
         callInsertInto(cmdCall.operands[0], context);
@@ -401,25 +438,47 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
   public void callSelect(String sql, InterpreterContext context) throws 
IOException {
     try {
       lock.lock();
+      // set parallelism from paragraph local property
       if (context.getLocalProperties().containsKey("parallelism")) {
         this.tbenv.getConfig().getConfiguration()
                 
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                         
Integer.parseInt(context.getLocalProperties().get("parallelism")));
       }
-      callInnerSelect(sql, context);
 
+      // set table config from set statement until now.
+      for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) {
+        this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), 
entry.getValue());
+      }
+      callInnerSelect(sql, context);
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
       }
+      // reset parallelism
       this.tbenv.getConfig().getConfiguration()
               
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                       defaultSqlParallelism);
+      // reset table config
+      for (ConfigOption configOption: tableConfigOptions.values()) {
+        // some may has no default value, e.g. 
ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+        if (configOption.defaultValue() != null) {
+          this.tbenv.getConfig().getConfiguration().set(configOption, 
configOption.defaultValue());
+        }
+      }
+      
this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
     }
   }
 
   public abstract void callInnerSelect(String sql, InterpreterContext context) 
throws IOException;
 
+  public void callSet(String key, String value, InterpreterContext context) 
throws IOException {
+    if (!tableConfigOptions.containsKey(key)) {
+      throw new IOException(key + " is not a valid table/sql config, please 
check link: " +
+              
"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html";);
+    }
+    currentConfigOptions.put(key, value);
+  }
+
   private void callInsertInto(String sql,
                               InterpreterContext context) throws IOException {
      if (!isBatch()) {
@@ -432,6 +491,12 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
                  
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                          
Integer.parseInt(context.getLocalProperties().get("parallelism")));
        }
+
+       // set table config from set statement until now.
+       for (Map.Entry<String, String> entry : currentConfigOptions.entrySet()) 
{
+         this.tbenv.getConfig().getConfiguration().setString(entry.getKey(), 
entry.getValue());
+       }
+
        this.tbenv.sqlUpdate(sql);
        this.tbenv.execute(sql);
      } catch (Exception e) {
@@ -440,9 +505,19 @@ public abstract class FlinkSqlInterrpeter extends 
Interpreter {
        if (lock.isHeldByCurrentThread()) {
          lock.unlock();
        }
+
+       // reset parallelism
        this.tbenv.getConfig().getConfiguration()
                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                        defaultSqlParallelism);
+       // reset table config
+       for (ConfigOption configOption: tableConfigOptions.values()) {
+         // some may has no default value, e.g. 
ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
+         if (configOption.defaultValue() != null) {
+           this.tbenv.getConfig().getConfiguration().set(configOption, 
configOption.defaultValue());
+         }
+       }
+       
this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
      }
      context.out.write("Insertion successfully.\n");
   }
diff --git 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index 3fe35c5..c75d7fe 100644
--- 
a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ 
b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -20,6 +20,8 @@ package org.apache.zeppelin.flink;
 
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -237,4 +239,61 @@ public class FlinkBatchSqlInterpreterTest extends 
SqlInterpreterTest {
     //    resultMessages = context.out.toInterpreterResultMessage();
     //    assertEquals("id\tname\n2\ta\n3\tb\n", 
resultMessages.get(0).getData());
   }
+
+  @Test
+  public void testSetTableConfig() throws InterpreterException, IOException {
+    hiveShell.execute("create table source_table (id int, name string)");
+    hiveShell.execute("insert into source_table values(1, 'a'), (2, 'b')");
+
+    File destDir = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir);
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE TABLE sink_table (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // set parallelism then insert into
+    InterpreterContext context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.exec.resource.default-parallelism=10;" +
+            "insert into sink_table select * from source_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = 
context.out.toInterpreterResultMessage();
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+    
assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+            
sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+
+    // set then insert into
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.optimizer.source.predicate-pushdown-enabled=false;" +
+                    "insert into sink_table select * from source_table", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+    
assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
+            
sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+    
assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
+            
sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));
+
+    // invalid config
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "set table.invalid_config=false;" +
+                    "insert into sink_table select * from source_table", 
context);
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertTrue(resultMessages.get(0).getData(),
+            resultMessages.get(0).getData().contains("table.invalid_config is 
not a valid table/sql config"));
+  }
 }

Reply via email to