This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 05ae5e4  [ZEPPELIN-5001]. Drop view doesn't work in flink interpreter
05ae5e4 is described below

commit 05ae5e45c6069498c4dca761e543c0712a3dfe2d
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Thu Oct 22 10:18:08 2020 +0800

    [ZEPPELIN-5001]. Drop view doesn't work in flink interpreter
    
    ### What is this PR for?
    
    The root cause is that in flink 1.10 there's only temporary view, but in 
1.11 flink introduce permanent view, so we should always use ddl to drop view 
instead of calling `tbenv.dropTemporaryView`
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5101
    
    ### How should this be tested?
    * Unit test is added
    https://travis-ci.org/github/zjffdu/zeppelin/builds/737948009
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3949 from zjffdu/ZEPPELIN-5101 and squashes the following commits:
    
    64f4d218e [Jeff Zhang] [ZEPPELIN-5001]. Drop view doesn't work in flink 
interpreter
    
    (cherry picked from commit b1aac73469baa3cb3ce2a71147b9124c1d15a143)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../org/apache/zeppelin/flink/FlinkSqlInterrpeter.java  | 17 ++++++++++++++---
 .../org/apache/zeppelin/flink/SqlInterpreterTest.java   |  8 ++++++++
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index d85e170..3d00bc8 100644
--- 
a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ 
b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -273,7 +273,7 @@ public abstract class FlinkSqlInterrpeter extends 
AbstractInterpreter {
         callCreateView(cmdCall, context);
         break;
       case DROP_VIEW:
-        callDropView(cmdCall.operands[0], context);
+        callDropView(cmdCall, context);
         break;
       case CREATE_DATABASE:
         callCreateDatabase(cmdCall.operands[0], context);
@@ -338,8 +338,19 @@ public abstract class FlinkSqlInterrpeter extends 
AbstractInterpreter {
     context.out.write("Database has been created.\n");
   }
 
-  private void callDropView(String view, InterpreterContext context) throws 
IOException {
-    this.tbenv.dropTemporaryView(view);
+  private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, 
InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+        this.tbenv.dropTemporaryView(sqlCommand.operands[0]);
+      } else {
+        flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
+      }
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
     context.out.write("View has been dropped.\n");
   }
 
diff --git 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 2bd9ee8..bee3159 100644
--- 
a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ 
b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -353,6 +353,14 @@ public abstract class SqlInterpreterTest {
     assertEquals(1, resultMessages.size());
     assertEquals("View has been dropped.\n", resultMessages.get(0).getData());
 
+    // show tables again
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show tables", context);
+    assertEquals(Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("table\nsource_table\n", resultMessages.get(0).getData());
+
     // create temporary view
     if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
       context = getInterpreterContext();

Reply via email to