This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 19808172ed [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi (#4770) 19808172ed is described below commit 19808172ed0dc589ef35e847cb4e9a276769fd77 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Wed Jul 3 22:11:18 2024 +0800 [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi (#4770) * [ZEPPELIN-6027] Enhanced Integration with Apache Kyuubi * fix style --- jdbc/pom.xml | 8 ++ .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 13 ++- .../jdbc/kyuubi/BeelineInPlaceUpdateStream.java | 118 +++++++++++++++++++++ .../apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java | 118 +++++++++++++++++++++ .../apache/zeppelin/jdbc/kyuubi/ProgressBar.java | 53 +++++++++ jdbc/src/main/resources/interpreter-setting.json | 21 ++++ 6 files changed, 328 insertions(+), 3 deletions(-) diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 35be95c71d..f60b66aae6 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -39,6 +39,7 @@ <h2.version>2.2.220</h2.version> <commons.dbcp2.version>2.0.1</commons.dbcp2.version> <hive3.version>3.1.3</hive3.version> + <kyuubi.version>1.9.1</kyuubi.version> <!--test library versions--> <mockrunner.jdbc.version>1.0.8</mockrunner.jdbc.version> @@ -135,6 +136,13 @@ </exclusions> </dependency> + <dependency> + <groupId>org.apache.kyuubi</groupId> + <artifactId>kyuubi-hive-jdbc-shaded</artifactId> + <version>${kyuubi.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>net.jodah</groupId> <artifactId>concurrentunit</artifactId> diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index e31c6e4a5d..b4cfba25b4 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -35,6 +35,7 @@ import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.apache.zeppelin.interpreter.util.SqlSplitter; import org.apache.zeppelin.jdbc.hive.HiveUtils; +import org.apache.zeppelin.jdbc.kyuubi.KyuubiUtils; import org.apache.zeppelin.tabledata.TableDataUtils; import org.apache.zeppelin.util.PropertiesUtil; import org.slf4j.Logger; @@ -825,10 +826,16 @@ public class JDBCInterpreter extends KerberosInterpreter { String jdbcURL = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY); String driver = getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY); - if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://") - && driver != null && driver.equals("org.apache.hive.jdbc.HiveDriver")) { - HiveUtils.startHiveMonitorThread(statement, context, + if (jdbcURL != null && driver != null) { + if (driver.equals("org.apache.hive.jdbc.HiveDriver") && + jdbcURL.startsWith("jdbc:hive2://")) { + HiveUtils.startHiveMonitorThread(statement, context, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); + } else if (driver.equals("org.apache.kyuubi.jdbc.KyuubiHiveDriver") && + (jdbcURL.startsWith("jdbc:kyuubi://") || jdbcURL.startsWith("jdbc:hive2://"))) { + KyuubiUtils.startMonitorThread(connection, statement, context, + Boolean.parseBoolean(getProperty("kyuubi.log.display", "true")), this); + } } boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java new file mode 100644 index 0000000000..4218461373 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/BeelineInPlaceUpdateStream.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.jdbc.kyuubi; + + +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus; +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProgressUpdateResp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintStream; +import java.util.List; + +public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream { + private static final Logger LOGGER = LoggerFactory.getLogger(BeelineInPlaceUpdateStream.class); + + private InPlaceUpdate inPlaceUpdate; + private EventNotifier notifier; + private long lastUpdateTimestamp; + + public BeelineInPlaceUpdateStream(PrintStream out, + EventNotifier notifier) { + this.inPlaceUpdate = new InPlaceUpdate(out); + this.notifier = notifier; + } + + @Override + public void update(TProgressUpdateResp response) { + if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) { + /* + we set it to completed if there is nothing the server has to report + for example, DDL statements + */ + notifier.progressBarCompleted(); + } else if (notifier.isOperationLogUpdatedAtLeastOnce()) { + /* + try to render in place update progress bar only if the operations logs is update at + least once + as this will hopefully allow printing the metadata information like query id, + application id + etc. have to remove these notifiers when the operation logs get merged into + GetOperationStatus + */ + lastUpdateTimestamp = System.currentTimeMillis(); + LOGGER.info("update progress: {}", response.getProgressedPercentage()); + inPlaceUpdate.render(new ProgressMonitorWrapper(response)); + } + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + @Override + public EventNotifier getEventNotifier() { + return notifier; + } + + static class ProgressMonitorWrapper implements ProgressMonitor { + private TProgressUpdateResp response; + + ProgressMonitorWrapper(TProgressUpdateResp response) { + this.response = response; + } + + @Override + public List<String> headers() { + return response.getHeaderNames(); + } + + @Override + public List<List<String>> rows() { + return response.getRows(); + } + + @Override + public String footerSummary() { + return response.getFooterSummary(); + } + + @Override + public long startTime() { + return response.getStartTime(); + } + + @Override + public String executionStatus() { + throw new UnsupportedOperationException( + "This should never be used for anything. All the required data is " + + "available via other methods" + ); + } + + @Override + public double progressedPercentage() { + return response.getProgressedPercentage(); + } + } +} + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java new file mode 100644 index 0000000000..1d10845c46 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/KyuubiUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.jdbc.kyuubi; + +import org.apache.commons.dbcp2.DelegatingConnection; +import org.apache.commons.dbcp2.DelegatingStatement; +import org.apache.commons.lang3.StringUtils; +import org.apache.kyuubi.jdbc.hive.KyuubiConnection; +import org.apache.kyuubi.jdbc.hive.KyuubiStatement; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.jdbc.JDBCInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class include hive specific stuff. + * e.g. Display hive job execution info. + * + */ +public class KyuubiUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(KyuubiUtils.class); + private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000; + /** + * Display hive job execution info + */ + public static void startMonitorThread(Connection conn, + Statement stmt, + InterpreterContext context, + boolean displayLog, + JDBCInterpreter jdbcInterpreter) { + KyuubiConnection kyuubiConn = (KyuubiConnection) ((DelegatingConnection<?>) + ((DelegatingConnection<?>) conn).getDelegate()).getDelegate(); + KyuubiStatement kyuubiStmt = (KyuubiStatement) + ((DelegatingStatement) ((DelegatingStatement) stmt).getDelegate()).getDelegate(); + // need to use final variable progressBar in thread, so need progressBarTemp here. + final ProgressBar progressBar = new ProgressBar(); + final long queryInterval = Long.parseLong( + jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.monitor.query_interval", + DEFAULT_QUERY_PROGRESS_INTERVAL + "")); + Thread thread = new Thread(() -> { + String jobUrlTemplate = jdbcInterpreter.getProperty("zeppelin.jdbc.kyuubi.jobUrl.template"); + boolean jobUrlExtracted = false; + + try { + while (kyuubiStmt.hasMoreLogs() && !kyuubiStmt.isClosed() && !Thread.interrupted()) { + Thread.sleep(queryInterval); + List<String> logs = kyuubiStmt.getExecLog(); + String logsOutput = StringUtils.join(logs, System.lineSeparator()); + LOGGER.debug("Kyuubi job output: {}", logsOutput); + boolean displayLogProperty = context.getBooleanLocalProperty("displayLog", displayLog); + if (displayLogProperty && !StringUtils.isBlank(logsOutput)) { + context.out.write(logsOutput + "\n"); + context.out.flush(); + progressBar.operationLogShowedToUser(); + } + + if (!jobUrlExtracted) { + String appId = kyuubiConn.getEngineId(); + String appUrl = kyuubiConn.getEngineUrl(); + String jobUrl = null; + // prefer to use customized template, and fallback to Kyuubi returned engine url + if (StringUtils.isNotBlank(jobUrlTemplate) && StringUtils.isNotBlank(appId)) { + jobUrl = jobUrlTemplate.replace("{{applicationId}}", appId); + } else if (StringUtils.isNotBlank(appUrl)) { + jobUrl = appUrl; + } + if (jobUrl != null) { + LOGGER.info("Detected Kyuubi engine URL: {}", jobUrl); + Map<String, String> infos = new HashMap<>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "KYUUBI JOB"); + infos.put("tooltip", "View Application Web UI"); + infos.put("noteId", context.getNoteId()); + infos.put("paraId", context.getParagraphId()); + context.getIntpEventClient().onParaInfosReceived(infos); + jobUrlExtracted = true; + } + } + } + } catch (InterruptedException e) { + LOGGER.warn("Kyuubi monitor thread is interrupted", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.warn("Fail to monitor KyuubiStatement", e); + } + + LOGGER.info("KyuubiMonitor-Thread is finished"); + }); + thread.setName("KyuubiMonitor-Thread"); + thread.setDaemon(true); + thread.start(); + LOGGER.info("Start KyuubiMonitor-Thread for sql: {}", kyuubiStmt); + + progressBar.setInPlaceUpdateStream(kyuubiStmt, context.out); + } +} diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java new file mode 100644 index 0000000000..f80a4b0d09 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/kyuubi/ProgressBar.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.jdbc.kyuubi; + +import org.apache.kyuubi.jdbc.hive.KyuubiStatement; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; + +import java.io.OutputStream; +import java.io.PrintStream; + +public class ProgressBar { + private InPlaceUpdateStream.EventNotifier eventNotifier; + private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream; + + public ProgressBar() { + this.eventNotifier = new InPlaceUpdateStream.EventNotifier(); + } + + public void operationLogShowedToUser() { + this.eventNotifier.operationLogShowedToUser(); + } + + public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) { + beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream( + new PrintStream(out), + eventNotifier + ); + return beelineInPlaceUpdateStream; + } + + public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() { + return beelineInPlaceUpdateStream; + } + + public void setInPlaceUpdateStream(KyuubiStatement kyuubiStmt, OutputStream out){ + kyuubiStmt.setInPlaceUpdateStream(this.getInPlaceUpdateStream(out)); + } +} diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index a723660d6e..3a22ff0a9b 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -150,6 +150,27 @@ "defaultValue": true, "description": "Set application tag for applications started by hive engines", "type": "checkbox" + }, + "zeppelin.jdbc.kyuubi.timeout.threshold": { + "envName": null, + "propertyName": "zeppelin.jdbc.kyuubi.timeout.threshold", + "defaultValue": "60000", + "description": "Timeout for kyuubi job timeout", + "type": "number" + }, + "zeppelin.jdbc.kyuubi.monitor.query_interval": { + "envName": null, + "propertyName": "zeppelin.jdbc.kyuubi.monitor.query_interval", + "defaultValue": "1000", + "description": "Query interval for kyuubi statement", + "type": "number" + }, + "zeppelin.jdbc.kyuubi.jobUrl.template": { + "envName": null, + "propertyName": "zeppelin.jdbc.kyuubi.jobUrl.template", + "defaultValue": "", + "description": "The Kyuubi engine URL pattern, supports {{applicationId}} as placeholder", + "type": "string" } }, "editor": {