This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 19808172ed [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi 
(#4770)
19808172ed is described below

commit 19808172ed0dc589ef35e847cb4e9a276769fd77
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Wed Jul 3 22:11:18 2024 +0800

    [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi (#4770)
    
    * [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi
    
    * fix style
---
 jdbc/pom.xml                                       |   8 ++
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  |  13 ++-
 .../jdbc/kyuubi/BeelineInPlaceUpdateStream.java    | 118 +++++++++++++++++++++
 .../apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java   | 118 +++++++++++++++++++++
 .../apache/zeppelin/jdbc/kyuubi/ProgressBar.java   |  53 +++++++++
 jdbc/src/main/resources/interpreter-setting.json   |  21 ++++
 6 files changed, 328 insertions(+), 3 deletions(-)

diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 35be95c71d..f60b66aae6 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -39,6 +39,7 @@
     <h2.version>2.2.220</h2.version>
     <commons.dbcp2.version>2.0.1</commons.dbcp2.version>
     <hive3.version>3.1.3</hive3.version>
+    <kyuubi.version>1.9.1</kyuubi.version>
 
     <!--test library versions-->
     <mockrunner.jdbc.version>1.0.8</mockrunner.jdbc.version>
@@ -135,6 +136,13 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kyuubi</groupId>
+      <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+      <version>${kyuubi.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>concurrentunit</artifactId>
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index e31c6e4a5d..b4cfba25b4 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -35,6 +35,7 @@ import 
org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
 import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.util.SqlSplitter;
 import org.apache.zeppelin.jdbc.hive.HiveUtils;
+import org.apache.zeppelin.jdbc.kyuubi.KyuubiUtils;
 import org.apache.zeppelin.tabledata.TableDataUtils;
 import org.apache.zeppelin.util.PropertiesUtil;
 import org.slf4j.Logger;
@@ -825,10 +826,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
           String jdbcURL = 
getJDBCConfiguration(user).getProperty().getProperty(URL_KEY);
           String driver =
                   
getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY);
-          if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://")
-                  && driver != null && 
driver.equals("org.apache.hive.jdbc.HiveDriver")) {
-            HiveUtils.startHiveMonitorThread(statement, context,
+          if (jdbcURL != null && driver != null) {
+            if (driver.equals("org.apache.hive.jdbc.HiveDriver") &&
+                jdbcURL.startsWith("jdbc:hive2://")) {
+              HiveUtils.startHiveMonitorThread(statement, context,
                     Boolean.parseBoolean(getProperty("hive.log.display", 
"true")), this);
+            } else if 
(driver.equals("org.apache.kyuubi.jdbc.KyuubiHiveDriver") &&
+                (jdbcURL.startsWith("jdbc:kyuubi://") || 
jdbcURL.startsWith("jdbc:hive2://"))) {
+              KyuubiUtils.startMonitorThread(connection, statement, context,
+                  Boolean.parseBoolean(getProperty("kyuubi.log.display", 
"true")), this);
+            }
           }
           boolean isResultSetAvailable = statement.execute(sqlToExecute);
           getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful();
diff --git 
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000000..4218461373
--- /dev/null
+++ 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProgressUpdateResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.List;
+
+public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BeelineInPlaceUpdateStream.class);
+
+  private InPlaceUpdate inPlaceUpdate;
+  private EventNotifier notifier;
+  private long lastUpdateTimestamp;
+
+  public BeelineInPlaceUpdateStream(PrintStream out,
+                                    EventNotifier notifier) {
+    this.inPlaceUpdate = new InPlaceUpdate(out);
+    this.notifier = notifier;
+  }
+
+  @Override
+  public void update(TProgressUpdateResp response) {
+    if (response == null || 
response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) {
+      /*
+        we set it to completed if there is nothing the server has to report
+        for example, DDL statements
+      */
+      notifier.progressBarCompleted();
+    } else if (notifier.isOperationLogUpdatedAtLeastOnce()) {
+      /*
+        try to render in place update progress bar only if the operations logs 
is update at
+        least once
+        as this will hopefully allow printing the metadata information like 
query id,
+        application id
+        etc. have to remove these notifiers when the operation logs get merged 
into
+        GetOperationStatus
+      */
+      lastUpdateTimestamp = System.currentTimeMillis();
+      LOGGER.info("update progress: {}", response.getProgressedPercentage());
+      inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+    }
+  }
+
+  public long getLastUpdateTimestamp() {
+    return lastUpdateTimestamp;
+  }
+
+  @Override
+  public EventNotifier getEventNotifier() {
+    return notifier;
+  }
+
+  static class ProgressMonitorWrapper implements ProgressMonitor {
+    private TProgressUpdateResp response;
+
+    ProgressMonitorWrapper(TProgressUpdateResp response) {
+      this.response = response;
+    }
+
+    @Override
+    public List<String> headers() {
+      return response.getHeaderNames();
+    }
+
+    @Override
+    public List<List<String>> rows() {
+      return response.getRows();
+    }
+
+    @Override
+    public String footerSummary() {
+      return response.getFooterSummary();
+    }
+
+    @Override
+    public long startTime() {
+      return response.getStartTime();
+    }
+
+    @Override
+    public String executionStatus() {
+      throw new UnsupportedOperationException(
+              "This should never be used for anything. All the required data 
is " +
+                      "available via other methods"
+      );
+    }
+
+    @Override
+    public double progressedPercentage() {
+      return response.getProgressedPercentage();
+    }
+  }
+}
+
diff --git 
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java
new file mode 100644
index 0000000000..1d10845c46
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+import org.apache.commons.dbcp2.DelegatingConnection;
+import org.apache.commons.dbcp2.DelegatingStatement;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kyuubi.jdbc.hive.KyuubiConnection;
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.jdbc.JDBCInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class include hive specific stuff.
+ * e.g. Display hive job execution info.
+ *
+ */
+public class KyuubiUtils {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KyuubiUtils.class);
+  private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000;
+  /**
+   * Display hive job execution info
+   */
+  public static void startMonitorThread(Connection conn,
+                                        Statement stmt,
+                                        InterpreterContext context,
+                                        boolean displayLog,
+                                        JDBCInterpreter jdbcInterpreter) {
+    KyuubiConnection kyuubiConn = (KyuubiConnection) ((DelegatingConnection<?>)
+        ((DelegatingConnection<?>) conn).getDelegate()).getDelegate();
+    KyuubiStatement kyuubiStmt = (KyuubiStatement)
+            ((DelegatingStatement) ((DelegatingStatement) 
stmt).getDelegate()).getDelegate();
+    // need to use final variable progressBar in thread, so need 
progressBarTemp here.
+    final ProgressBar progressBar = new ProgressBar();
+    final long queryInterval = Long.parseLong(
+            
jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.monitor.query_interval",
+                    DEFAULT_QUERY_PROGRESS_INTERVAL + ""));
+    Thread thread = new Thread(() -> {
+      String jobUrlTemplate = 
jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.jobUrl.template");
+      boolean jobUrlExtracted = false;
+
+      try {
+        while (kyuubiStmt.hasMoreLogs() && !kyuubiStmt.isClosed() && 
!Thread.interrupted()) {
+          Thread.sleep(queryInterval);
+          List<String> logs = kyuubiStmt.getExecLog();
+          String logsOutput = StringUtils.join(logs, System.lineSeparator());
+          LOGGER.debug("Kyuubi job output: {}", logsOutput);
+          boolean displayLogProperty = 
context.getBooleanLocalProperty("displayLog", displayLog);
+          if (displayLogProperty && !StringUtils.isBlank(logsOutput)) {
+            context.out.write(logsOutput + "\n");
+            context.out.flush();
+            progressBar.operationLogShowedToUser();
+          }
+
+          if (!jobUrlExtracted) {
+            String appId = kyuubiConn.getEngineId();
+            String appUrl = kyuubiConn.getEngineUrl();
+            String jobUrl = null;
+            // prefer to use customized template, and fallback to Kyuubi 
returned engine url
+            if (StringUtils.isNotBlank(jobUrlTemplate) && 
StringUtils.isNotBlank(appId)) {
+              jobUrl = jobUrlTemplate.replace("{{applicationId}}", appId);
+            } else if (StringUtils.isNotBlank(appUrl)) {
+              jobUrl = appUrl;
+            }
+            if (jobUrl != null) {
+              LOGGER.info("Detected Kyuubi engine URL: {}", jobUrl);
+              Map<String, String> infos = new HashMap<>();
+              infos.put("jobUrl", jobUrl);
+              infos.put("label", "KYUUBI JOB");
+              infos.put("tooltip", "View Application Web UI");
+              infos.put("noteId", context.getNoteId());
+              infos.put("paraId", context.getParagraphId());
+              context.getIntpEventClient().onParaInfosReceived(infos);
+              jobUrlExtracted = true;
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOGGER.warn("Kyuubi monitor thread is interrupted", e);
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        LOGGER.warn("Fail to monitor KyuubiStatement", e);
+      }
+
+      LOGGER.info("KyuubiMonitor-Thread is finished");
+    });
+    thread.setName("KyuubiMonitor-Thread");
+    thread.setDaemon(true);
+    thread.start();
+    LOGGER.info("Start KyuubiMonitor-Thread for sql: {}", kyuubiStmt);
+
+    progressBar.setInPlaceUpdateStream(kyuubiStmt, context.out);
+  }
+}
diff --git 
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java
new file mode 100644
index 0000000000..f80a4b0d09
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+public class ProgressBar {
+  private InPlaceUpdateStream.EventNotifier eventNotifier;
+  private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream;
+
+  public ProgressBar() {
+    this.eventNotifier = new InPlaceUpdateStream.EventNotifier();
+  }
+
+  public void operationLogShowedToUser() {
+    this.eventNotifier.operationLogShowedToUser();
+  }
+
+  public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) {
+    beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream(
+            new PrintStream(out),
+            eventNotifier
+    );
+    return beelineInPlaceUpdateStream;
+  }
+
+  public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() {
+    return beelineInPlaceUpdateStream;
+  }
+
+  public void setInPlaceUpdateStream(KyuubiStatement kyuubiStmt, OutputStream 
out){
+    kyuubiStmt.setInPlaceUpdateStream(this.getInPlaceUpdateStream(out));
+  }
+}
diff --git a/jdbc/src/main/resources/interpreter-setting.json 
b/jdbc/src/main/resources/interpreter-setting.json
index a723660d6e..3a22ff0a9b 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -150,6 +150,27 @@
         "defaultValue": true,
         "description": "Set application tag for applications started by hive 
engines",
         "type": "checkbox"
+      },
+      "zeppelin.jdbc.kyuubi.timeout.threshold": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.kyuubi.timeout.threshold",
+        "defaultValue": "60000",
+        "description": "Timeout for kyuubi job timeout",
+        "type": "number"
+      },
+      "zeppelin.jdbc.kyuubi.monitor.query_interval": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.kyuubi.monitor.query_interval",
+        "defaultValue": "1000",
+        "description": "Query interval for kyuubi statement",
+        "type": "number"
+      },
+      "zeppelin.jdbc.kyuubi.jobUrl.template": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.kyuubi.jobUrl.template",
+        "defaultValue": "",
+        "description": "The Kyuubi engine URL pattern, supports 
{{applicationId}} as placeholder",
+        "type": "string"
       }
     },
     "editor": {

Reply via email to