This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 19808172ed [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi
(#4770)
19808172ed is described below
commit 19808172ed0dc589ef35e847cb4e9a276769fd77
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jul 3 22:11:18 2024 +0800
[ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi (#4770)
* [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi
* fix style
---
jdbc/pom.xml | 8 ++
.../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 13 ++-
.../jdbc/kyuubi/BeelineInPlaceUpdateStream.java | 118 +++++++++++++++++++++
.../apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java | 118 +++++++++++++++++++++
.../apache/zeppelin/jdbc/kyuubi/ProgressBar.java | 53 +++++++++
jdbc/src/main/resources/interpreter-setting.json | 21 ++++
6 files changed, 328 insertions(+), 3 deletions(-)
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 35be95c71d..f60b66aae6 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -39,6 +39,7 @@
<h2.version>2.2.220</h2.version>
<commons.dbcp2.version>2.0.1</commons.dbcp2.version>
<hive3.version>3.1.3</hive3.version>
+ <kyuubi.version>1.9.1</kyuubi.version>
<!--test library versions-->
<mockrunner.jdbc.version>1.0.8</mockrunner.jdbc.version>
@@ -135,6 +136,13 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+ <version>${kyuubi.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index e31c6e4a5d..b4cfba25b4 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -35,6 +35,7 @@ import
org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.util.SqlSplitter;
import org.apache.zeppelin.jdbc.hive.HiveUtils;
+import org.apache.zeppelin.jdbc.kyuubi.KyuubiUtils;
import org.apache.zeppelin.tabledata.TableDataUtils;
import org.apache.zeppelin.util.PropertiesUtil;
import org.slf4j.Logger;
@@ -825,10 +826,16 @@ public class JDBCInterpreter extends KerberosInterpreter {
String jdbcURL =
getJDBCConfiguration(user).getProperty().getProperty(URL_KEY);
String driver =
getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY);
- if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://")
- && driver != null &&
driver.equals("org.apache.hive.jdbc.HiveDriver")) {
- HiveUtils.startHiveMonitorThread(statement, context,
+ if (jdbcURL != null && driver != null) {
+ if (driver.equals("org.apache.hive.jdbc.HiveDriver") &&
+ jdbcURL.startsWith("jdbc:hive2://")) {
+ HiveUtils.startHiveMonitorThread(statement, context,
Boolean.parseBoolean(getProperty("hive.log.display",
"true")), this);
+ } else if
(driver.equals("org.apache.kyuubi.jdbc.KyuubiHiveDriver") &&
+ (jdbcURL.startsWith("jdbc:kyuubi://") ||
jdbcURL.startsWith("jdbc:hive2://"))) {
+ KyuubiUtils.startMonitorThread(connection, statement, context,
+ Boolean.parseBoolean(getProperty("kyuubi.log.display",
"true")), this);
+ }
}
boolean isResultSetAvailable = statement.execute(sqlToExecute);
getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful();
diff --git
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000000..4218461373
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProgressUpdateResp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.util.List;
+
+public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BeelineInPlaceUpdateStream.class);
+
+ private InPlaceUpdate inPlaceUpdate;
+ private EventNotifier notifier;
+ private long lastUpdateTimestamp;
+
+ public BeelineInPlaceUpdateStream(PrintStream out,
+ EventNotifier notifier) {
+ this.inPlaceUpdate = new InPlaceUpdate(out);
+ this.notifier = notifier;
+ }
+
+ @Override
+ public void update(TProgressUpdateResp response) {
+ if (response == null ||
response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) {
+ /*
+ we set it to completed if there is nothing the server has to report
+ for example, DDL statements
+ */
+ notifier.progressBarCompleted();
+ } else if (notifier.isOperationLogUpdatedAtLeastOnce()) {
+ /*
+ try to render in place update progress bar only if the operations logs
is update at
+ least once
+ as this will hopefully allow printing the metadata information like
query id,
+ application id
+ etc. have to remove these notifiers when the operation logs get merged
into
+ GetOperationStatus
+ */
+ lastUpdateTimestamp = System.currentTimeMillis();
+ LOGGER.info("update progress: {}", response.getProgressedPercentage());
+ inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+ }
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ @Override
+ public EventNotifier getEventNotifier() {
+ return notifier;
+ }
+
+ static class ProgressMonitorWrapper implements ProgressMonitor {
+ private TProgressUpdateResp response;
+
+ ProgressMonitorWrapper(TProgressUpdateResp response) {
+ this.response = response;
+ }
+
+ @Override
+ public List<String> headers() {
+ return response.getHeaderNames();
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return response.getRows();
+ }
+
+ @Override
+ public String footerSummary() {
+ return response.getFooterSummary();
+ }
+
+ @Override
+ public long startTime() {
+ return response.getStartTime();
+ }
+
+ @Override
+ public String executionStatus() {
+ throw new UnsupportedOperationException(
+ "This should never be used for anything. All the required data
is " +
+ "available via other methods"
+ );
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return response.getProgressedPercentage();
+ }
+ }
+}
+
diff --git
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java
new file mode 100644
index 0000000000..1d10845c46
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+import org.apache.commons.dbcp2.DelegatingConnection;
+import org.apache.commons.dbcp2.DelegatingStatement;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kyuubi.jdbc.hive.KyuubiConnection;
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.jdbc.JDBCInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class include hive specific stuff.
+ * e.g. Display hive job execution info.
+ *
+ */
+public class KyuubiUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KyuubiUtils.class);
+ private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000;
+ /**
+ * Display hive job execution info
+ */
+ public static void startMonitorThread(Connection conn,
+ Statement stmt,
+ InterpreterContext context,
+ boolean displayLog,
+ JDBCInterpreter jdbcInterpreter) {
+ KyuubiConnection kyuubiConn = (KyuubiConnection) ((DelegatingConnection<?>)
+ ((DelegatingConnection<?>) conn).getDelegate()).getDelegate();
+ KyuubiStatement kyuubiStmt = (KyuubiStatement)
+ ((DelegatingStatement) ((DelegatingStatement)
stmt).getDelegate()).getDelegate();
+ // need to use final variable progressBar in thread, so need
progressBarTemp here.
+ final ProgressBar progressBar = new ProgressBar();
+ final long queryInterval = Long.parseLong(
+
jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.monitor.query_interval",
+ DEFAULT_QUERY_PROGRESS_INTERVAL + ""));
+ Thread thread = new Thread(() -> {
+ String jobUrlTemplate =
jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.jobUrl.template");
+ boolean jobUrlExtracted = false;
+
+ try {
+ while (kyuubiStmt.hasMoreLogs() && !kyuubiStmt.isClosed() &&
!Thread.interrupted()) {
+ Thread.sleep(queryInterval);
+ List<String> logs = kyuubiStmt.getExecLog();
+ String logsOutput = StringUtils.join(logs, System.lineSeparator());
+ LOGGER.debug("Kyuubi job output: {}", logsOutput);
+ boolean displayLogProperty =
context.getBooleanLocalProperty("displayLog", displayLog);
+ if (displayLogProperty && !StringUtils.isBlank(logsOutput)) {
+ context.out.write(logsOutput + "\n");
+ context.out.flush();
+ progressBar.operationLogShowedToUser();
+ }
+
+ if (!jobUrlExtracted) {
+ String appId = kyuubiConn.getEngineId();
+ String appUrl = kyuubiConn.getEngineUrl();
+ String jobUrl = null;
+ // prefer to use customized template, and fallback to Kyuubi
returned engine url
+ if (StringUtils.isNotBlank(jobUrlTemplate) &&
StringUtils.isNotBlank(appId)) {
+ jobUrl = jobUrlTemplate.replace("{{applicationId}}", appId);
+ } else if (StringUtils.isNotBlank(appUrl)) {
+ jobUrl = appUrl;
+ }
+ if (jobUrl != null) {
+ LOGGER.info("Detected Kyuubi engine URL: {}", jobUrl);
+ Map<String, String> infos = new HashMap<>();
+ infos.put("jobUrl", jobUrl);
+ infos.put("label", "KYUUBI JOB");
+ infos.put("tooltip", "View Application Web UI");
+ infos.put("noteId", context.getNoteId());
+ infos.put("paraId", context.getParagraphId());
+ context.getIntpEventClient().onParaInfosReceived(infos);
+ jobUrlExtracted = true;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Kyuubi monitor thread is interrupted", e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.warn("Fail to monitor KyuubiStatement", e);
+ }
+
+ LOGGER.info("KyuubiMonitor-Thread is finished");
+ });
+ thread.setName("KyuubiMonitor-Thread");
+ thread.setDaemon(true);
+ thread.start();
+ LOGGER.info("Start KyuubiMonitor-Thread for sql: {}", kyuubiStmt);
+
+ progressBar.setInPlaceUpdateStream(kyuubiStmt, context.out);
+ }
+}
diff --git
a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java
new file mode 100644
index 0000000000..f80a4b0d09
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.jdbc.kyuubi;
+
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+public class ProgressBar {
+ private InPlaceUpdateStream.EventNotifier eventNotifier;
+ private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream;
+
+ public ProgressBar() {
+ this.eventNotifier = new InPlaceUpdateStream.EventNotifier();
+ }
+
+ public void operationLogShowedToUser() {
+ this.eventNotifier.operationLogShowedToUser();
+ }
+
+ public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) {
+ beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream(
+ new PrintStream(out),
+ eventNotifier
+ );
+ return beelineInPlaceUpdateStream;
+ }
+
+ public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() {
+ return beelineInPlaceUpdateStream;
+ }
+
+ public void setInPlaceUpdateStream(KyuubiStatement kyuubiStmt, OutputStream
out){
+ kyuubiStmt.setInPlaceUpdateStream(this.getInPlaceUpdateStream(out));
+ }
+}
diff --git a/jdbc/src/main/resources/interpreter-setting.json
b/jdbc/src/main/resources/interpreter-setting.json
index a723660d6e..3a22ff0a9b 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -150,6 +150,27 @@
"defaultValue": true,
"description": "Set application tag for applications started by hive
engines",
"type": "checkbox"
+ },
+ "zeppelin.jdbc.kyuubi.timeout.threshold": {
+ "envName": null,
+ "propertyName": "zeppelin.jdbc.kyuubi.timeout.threshold",
+ "defaultValue": "60000",
+ "description": "Timeout for kyuubi job timeout",
+ "type": "number"
+ },
+ "zeppelin.jdbc.kyuubi.monitor.query_interval": {
+ "envName": null,
+ "propertyName": "zeppelin.jdbc.kyuubi.monitor.query_interval",
+ "defaultValue": "1000",
+ "description": "Query interval for kyuubi statement",
+ "type": "number"
+ },
+ "zeppelin.jdbc.kyuubi.jobUrl.template": {
+ "envName": null,
+ "propertyName": "zeppelin.jdbc.kyuubi.jobUrl.template",
+ "defaultValue": "",
+ "description": "The Kyuubi engine URL pattern, supports
{{applicationId}} as placeholder",
+ "type": "string"
}
},
"editor": {