This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 0202273 [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job 0202273 is described below commit 02022732c8b73050576c1b21395755ffafe50c77 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Oct 21 10:38:51 2020 +0800 [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job ### What is this PR for? This PR is to cancel paragraph is no sufficient resources for hive jobs in jdbc interpreter. ### What type of PR is it? [Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5100 ### How should this be tested? * Manually tested ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3948 from zjffdu/ZEPPELIN-5100 and squashes the following commits: f0542ee29 [Jeff Zhang] [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 17 ++++++++++-- .../org/apache/zeppelin/jdbc/hive/HiveUtils.java | 31 +++++++++++++++++++--- jdbc/src/main/resources/interpreter-setting.json | 7 +++++ .../apache/zeppelin/jdbc/hive/HiveUtilsTest.java | 2 +- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 5d792ec..e72481e 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -125,6 +125,7 @@ public class JDBCInterpreter extends KerberosInterpreter { private static final char TAB = '\t'; private static final String TABLE_MAGIC_TAG = "%table "; private static final String EXPLAIN_PREDICATE = "EXPLAIN "; + private static final String CANCEL_REASON = "cancel_reason"; static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY; @@ -160,7 +161,6 @@ public class JDBCInterpreter extends KerberosInterpreter { private int maxLineResults; private int maxRows; - private SqlSplitter sqlSplitter; private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>(); @@ -735,7 +735,7 @@ public class JDBCInterpreter extends KerberosInterpreter { String jdbcURL = getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY); if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://")) { HiveUtils.startHiveMonitorThread(statement, context, - Boolean.parseBoolean(getProperty("hive.log.display", "true"))); + Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); @@ -932,8 +932,21 @@ public class JDBCInterpreter extends KerberosInterpreter { } catch (SQLException e) { LOGGER.error("Error while cancelling...", e); } + + String cancelReason = context.getLocalProperties().get(CANCEL_REASON); + if (StringUtils.isNotBlank(cancelReason)) { + try { + context.out.write(cancelReason); + } catch (IOException e) { + LOGGER.error("Fail to write cancel reason"); + } + } } + public void cancel(InterpreterContext context, String errorMessage) { + context.getLocalProperties().put(CANCEL_REASON, errorMessage); + cancel(context); + } /** * * diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java index 58fc2f0..ad253f0 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.jdbc.HiveStatement; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.jdbc.JDBCInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,8 @@ public class HiveUtils { */ public static void startHiveMonitorThread(Statement stmt, InterpreterContext context, - boolean displayLog) { + boolean displayLog, + JDBCInterpreter jdbcInterpreter) { HiveStatement hiveStmt = (HiveStatement) ((DelegatingStatement) ((DelegatingStatement) stmt).getDelegate()).getDelegate(); String hiveVersion = HiveVersionInfo.getVersion(); @@ -66,8 +68,11 @@ public class HiveUtils { } // need to use final variable progressBar in thread, so need progressBarTemp here. final ProgressBar progressBar = progressBarTemp; - + final long timeoutThreshold = Long.parseLong( + jdbcInterpreter.getProperty("zeppelin.jdbc.hive.timeout.threshold", "" + 60 * 1000)); Thread thread = new Thread(() -> { + boolean jobLaunched = false; + long jobLastActiveTime = System.currentTimeMillis(); while (hiveStmt.hasMoreLogs() && !Thread.interrupted()) { try { List<String> logs = hiveStmt.getQueryLog(); @@ -81,7 +86,7 @@ public class HiveUtils { if (!StringUtils.isBlank(logsOutput) && progressBar != null && displayLogProperty) { progressBar.operationLogShowedToUser(); } - Optional<String> jobURL = extractJobURL(logsOutput); + Optional<String> jobURL = extractMRJobURL(logsOutput); if (jobURL.isPresent()) { Map<String, String> infos = new HashMap<>(); infos.put("jobUrl", jobURL.get()); @@ -91,6 +96,24 @@ public class HiveUtils { infos.put("paraId", context.getParagraphId()); context.getIntpEventClient().onParaInfosReceived(infos); } + if (logsOutput.contains("Launching Job")) { + jobLaunched = true; + } + + if (jobLaunched) { + if (StringUtils.isNotBlank(logsOutput)) { + jobLastActiveTime = System.currentTimeMillis(); + } else { + if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) { + String errorMessage = "Cancel this job as no more log is produced in the " + + "last " + timeoutThreshold / 1000 + " seconds, " + + "maybe it is because no yarn resources"; + LOGGER.warn(errorMessage); + jdbcInterpreter.cancel(context, errorMessage); + break; + } + } + } // refresh logs every 1 second. Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL); } catch (Exception e) { @@ -118,7 +141,7 @@ public class HiveUtils { } // extract hive job url from logs, it only works for MR engine. - static Optional<String> extractJobURL(String log) { + static Optional<String> extractMRJobURL(String log) { Matcher matcher = JOBURL_PATTERN.matcher(log); if (matcher.matches()) { String jobURL = matcher.group(1); diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index 5aac46e..f203ac1 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -122,6 +122,13 @@ "defaultValue": "1000", "description": "Maximum number of rows fetched from the query.", "type": "number" + }, + "zeppelin.jdbc.hive.timeout.threshold": { + "envName": null, + "propertyName": "zeppelin.jdbc.hive.timeout.threshold", + "defaultValue": "60000", + "description": "Timeout for hive job timeout", + "type": "number" } }, "editor": { diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java index f0f6269..058f7eb 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java @@ -28,7 +28,7 @@ public class HiveUtilsTest { @Test public void testJobURL() { - Optional<String> jobURL = HiveUtils.extractJobURL( + Optional<String> jobURL = HiveUtils.extractMRJobURL( "INFO : The url to track the job: " + "http://localhost:8088/proxy/application_1591195707498_0064/\n" + "INFO : Starting Job = job_1591195707498_0064, " +