This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 1500ca3 [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez 1500ca3 is described below commit 1500ca3d6e469ec18f7fde5225bfd41f7d69adf9 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Nov 27 14:12:29 2020 +0800 [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez ### What is this PR for? The root cause is that for hive on tez, log is updated via BeelineInPlaceUpdateStream instead of HiveStatement.getQueryLog(). This PR would check BeelineInPlaceUpdateStream to update the `jobLastActiveTime` ### What type of PR is it? [Bug Fix |] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5146 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3984 from zjffdu/ZEPPELIN-5146 and squashes the following commits: 4e4168e93 [Jeff Zhang] save f6f021c28 [Jeff Zhang] [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez (cherry picked from commit fc36cbc425fab30f5bba94db729f3c09fc31a730) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../jdbc/hive/BeelineInPlaceUpdateStream.java | 6 ++++++ .../org/apache/zeppelin/jdbc/hive/HiveUtils.java | 25 ++++++++++++++-------- .../org/apache/zeppelin/jdbc/hive/ProgressBar.java | 8 ++++++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java index cd7cbd1..c4367b1 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java @@ -35,6 +35,7 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream { private InPlaceUpdate inPlaceUpdate; private EventNotifier notifier; + private long lastUpdateTimestamp; public BeelineInPlaceUpdateStream(PrintStream out, InPlaceUpdateStream.EventNotifier notifier) { @@ -59,11 +60,16 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream { etc. have to remove these notifiers when the operation logs get merged into GetOperationStatus */ + lastUpdateTimestamp = System.currentTimeMillis(); LOGGER.info("update progress: " + response.getProgressedPercentage()); inPlaceUpdate.render(new ProgressMonitorWrapper(response)); } } + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + @Override public EventNotifier getEventNotifier() { return notifier; diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java index c7d47b9..026cbb8 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java @@ -102,17 +102,24 @@ public class HiveUtils { } if (jobLaunched) { + // Step 1. update jobLastActiveTime first + // Step 2. Check whether it is timeout. if (StringUtils.isNotBlank(logsOutput)) { jobLastActiveTime = System.currentTimeMillis(); - } else { - if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) { - String errorMessage = "Cancel this job as no more log is produced in the " + - "last " + timeoutThreshold / 1000 + " seconds, " + - "maybe it is because no yarn resources"; - LOGGER.warn(errorMessage); - jdbcInterpreter.cancel(context, errorMessage); - break; - } + } else if (progressBar.getBeelineInPlaceUpdateStream() != null && + progressBar.getBeelineInPlaceUpdateStream().getLastUpdateTimestamp() + > jobLastActiveTime) { + jobLastActiveTime = progressBar.getBeelineInPlaceUpdateStream() + .getLastUpdateTimestamp(); + } + + if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) { + String errorMessage = "Cancel this job as no more log is produced in the " + + "last " + timeoutThreshold / 1000 + " seconds, " + + "maybe it is because no yarn resources"; + LOGGER.warn(errorMessage); + jdbcInterpreter.cancel(context, errorMessage); + break; } } // refresh logs every 1 second. diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java index 577a1f1..adf2846 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java @@ -27,6 +27,7 @@ import java.io.PrintStream; */ public class ProgressBar { private InPlaceUpdateStream.EventNotifier eventNotifier; + private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream; public ProgressBar() { this.eventNotifier = new InPlaceUpdateStream.EventNotifier(); @@ -37,10 +38,15 @@ public class ProgressBar { } public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) { - return new BeelineInPlaceUpdateStream( + beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream( new PrintStream(out), eventNotifier ); + return beelineInPlaceUpdateStream; + } + + public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() { + return beelineInPlaceUpdateStream; } public void setInPlaceUpdateStream(HiveStatement hiveStmt, OutputStream out){