KYLIN-1827 Send mail notification when runtime exception throws during build/merge cube
Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0954176a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0954176a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0954176a Branch: refs/heads/KYLIN-1726 Commit: 0954176adb50df25d0995a7fbeb68e64ee7aed79 Parents: c59d63d Author: Ma,Gang <ga...@ebay.com> Authored: Fri Jul 15 11:27:40 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Sep 12 11:23:19 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/AbstractExecutable.java | 123 ++++++++++++++----- .../org/apache/kylin/engine/mr/CubingJob.java | 40 ++++++ 2 files changed, 129 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0954176a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 09f9b54..1eee5da 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.MailService; import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.exception.PersistentException; import org.apache.kylin.job.impl.threadpool.DefaultContext; import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; @@ -99,31 +100,62 @@ public abstract class AbstractExecutable implements Executable, Idempotent { Preconditions.checkArgument(executableContext instanceof DefaultContext); ExecuteResult result = null; + try { + onExecuteStart(executableContext); + Throwable exception; + do { + if (retry > 0) { + logger.info("Retry " + retry); + } + exception = null; + result = null; + try { + result = doWork(executableContext); + } catch (Throwable e) { + logger.error("error running Executable: " + this.toString()); + exception = e; + } + retry++; + } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true); - onExecuteStart(executableContext); - Throwable exception; - do { - if (retry > 0) { - logger.info("Retry " + retry); + if (exception != null) { + onExecuteError(exception, executableContext); + throw new ExecuteException(exception); } - exception = null; - result = null; - try { - result = doWork(executableContext); - } catch (Throwable e) { - logger.error("error running Executable: " + this.toString()); - exception = e; + + onExecuteFinished(result, executableContext); + } catch (Exception e) { + if (isMetaDataPersistException(e)){ + handleMetaDataPersistException(e); + } + if (e instanceof ExecuteException){ + throw e; + } else { + throw new ExecuteException(e); } - retry++; - } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true); + } + return result; + } + + protected void handleMetaDataPersistException(Exception e) { + // do nothing. + } - if (exception != null) { - onExecuteError(exception, executableContext); - throw new ExecuteException(exception); + private boolean isMetaDataPersistException(Exception e) { + if (e instanceof PersistentException){ + return true; } - onExecuteFinished(result, executableContext); - return result; + Throwable t = e.getCause(); + int depth = 0; + while (t!= null && depth<5) { + depth ++; + if (t instanceof PersistentException){ + return true; + } + t = t.getCause(); + } + return false; } protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException; @@ -209,29 +241,52 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected final void notifyUserStatusChange(ExecutableContext context, ExecutableState state) { try { - List<String> users = Lists.newArrayList(); - users.addAll(getNotifyList()); final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final String[] adminDls = kylinConfig.getAdminDls(); - if (null != adminDls) { - for (String adminDl : adminDls) { - users.add(adminDl); - } - } + List<String> users = getAllNofifyUsers(kylinConfig); if (users.isEmpty()) { logger.warn("no need to send email, user list is empty"); return; } final Pair<String, String> email = formatNotifications(context, state); - if (email == null) { - logger.warn("no need to send email, content is null"); + doSendMail(kylinConfig,users,email); + } catch (Exception e) { + logger.error("error send email", e); + } + } + + private List<String> getAllNofifyUsers(KylinConfig kylinConfig){ + List<String> users = Lists.newArrayList(); + users.addAll(getNotifyList()); + final String[] adminDls = kylinConfig.getAdminDls(); + if (null != adminDls) { + for (String adminDl : adminDls) { + users.add(adminDl); + } + } + return users; + } + + private void doSendMail(KylinConfig kylinConfig, List<String> users, Pair<String,String> email){ + if (email == null) { + logger.warn("no need to send email, content is null"); + return; + } + logger.info("prepare to send email to:" + users); + logger.info("job name:" + getName()); + logger.info("submitter:" + getSubmitter()); + logger.info("notify list:" + users); + new MailService(kylinConfig).sendMail(users, email.getLeft(), email.getRight()); + } + + protected void sendMail(Pair<String, String> email) { + try { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + List<String> users = getAllNofifyUsers(kylinConfig); + if (users.isEmpty()) { + logger.warn("no need to send email, user list is empty"); return; } - logger.info("prepare to send email to:" + users); - logger.info("job name:" + getName()); - logger.info("submitter:" + getSubmitter()); - logger.info("notify list:" + users); - new MailService(kylinConfig).sendMail(users, email.getLeft(), email.getRight()); + doSendMail(kylinConfig, users, email); } catch (Exception e) { logger.error("error send email", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0954176a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index c9ffe14..9c7f57a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -18,6 +18,8 @@ package org.apache.kylin.engine.mr; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; @@ -183,6 +185,44 @@ public class CubingJob extends DefaultChainedExecutable { super.onExecuteFinished(result, executableContext); } + /** + * build fail because the metadata store has problem. + * @param exception + */ + @Override + protected void handleMetaDataPersistException(Exception exception) { + String title = "[ERROR] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " + CubingExecutableUtil.getCubeName(this.getParams()); + String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE; + final String UNKNOWN = "UNKNOWN"; + String errMsg = null; + if (exception != null) { + final StringWriter out = new StringWriter(); + exception.printStackTrace(new PrintWriter(out)); + errMsg = out.toString(); + } + + content = content.replaceAll("\\$\\{job_name\\}", getName()); + content = content.replaceAll("\\$\\{result\\}", ExecutableState.ERROR.toString()); + content = content.replaceAll("\\$\\{env_name\\}", getDeployEnvName()); + content = content.replaceAll("\\$\\{project_name\\}", getProjectName()); + content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams())); + content = content.replaceAll("\\$\\{source_records_count\\}", UNKNOWN); + content = content.replaceAll("\\$\\{start_time\\}", UNKNOWN); + content = content.replaceAll("\\$\\{duration\\}", UNKNOWN); + content = content.replaceAll("\\$\\{mr_waiting\\}", UNKNOWN); + content = content.replaceAll("\\$\\{last_update_time\\}", UNKNOWN); + content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter")); + content = content.replaceAll("\\$\\{error_log\\}", Matcher.quoteReplacement(StringUtil.noBlank(errMsg, "no error message"))); + + try { + InetAddress inetAddress = InetAddress.getLocalHost(); + content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName()); + } catch (UnknownHostException e) { + logger.warn(e.getLocalizedMessage(), e); + } + sendMail(Pair.of(title,content)); + } + public long getMapReduceWaitTime() { return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L); }