This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 05ae5e4 [ZEPPELIN-5001]. Drop view doesn't work in flink interpreter 05ae5e4 is described below commit 05ae5e45c6069498c4dca761e543c0712a3dfe2d Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Oct 22 10:18:08 2020 +0800 [ZEPPELIN-5001]. Drop view doesn't work in flink interpreter ### What is this PR for? The root cause is that in flink 1.10 there's only temporary view, but in 1.11 flink introduce permanent view, so we should always use ddl to drop view instead of calling `tbenv.dropTemporaryView` ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5101 ### How should this be tested? * Unit test is added https://travis-ci.org/github/zjffdu/zeppelin/builds/737948009 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3949 from zjffdu/ZEPPELIN-5101 and squashes the following commits: 64f4d218e [Jeff Zhang] [ZEPPELIN-5001]. Drop view doesn't work in flink interpreter (cherry picked from commit b1aac73469baa3cb3ce2a71147b9124c1d15a143) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 17 ++++++++++++++--- .../org/apache/zeppelin/flink/SqlInterpreterTest.java | 8 ++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java index d85e170..3d00bc8 100644 --- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java +++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java @@ -273,7 +273,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter { callCreateView(cmdCall, context); break; case DROP_VIEW: - callDropView(cmdCall.operands[0], context); + callDropView(cmdCall, context); break; case CREATE_DATABASE: callCreateDatabase(cmdCall.operands[0], context); @@ -338,8 +338,19 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter { context.out.write("Database has been created.\n"); } - private void callDropView(String view, InterpreterContext context) throws IOException { - this.tbenv.dropTemporaryView(view); + private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (flinkInterpreter.getFlinkVersion().isFlink110()) { + this.tbenv.dropTemporaryView(sqlCommand.operands[0]); + } else { + flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } context.out.write("View has been dropped.\n"); } diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java index 2bd9ee8..bee3159 100644 --- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java +++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java @@ -353,6 +353,14 @@ public abstract class SqlInterpreterTest { assertEquals(1, resultMessages.size()); assertEquals("View has been dropped.\n", resultMessages.get(0).getData()); + // show tables again + context = getInterpreterContext(); + result = sqlInterpreter.interpret("show tables", context); + assertEquals(Code.SUCCESS, result.code()); + resultMessages = context.out.toInterpreterResultMessage(); + assertEquals(Type.TABLE, resultMessages.get(0).getType()); + assertEquals("table\nsource_table\n", resultMessages.get(0).getData()); + // create temporary view if (!flinkInterpreter.getFlinkVersion().isFlink110()) { context = getInterpreterContext();