This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 58956007a682c0a7f2c3087c3e59b5788e6c48df Author: hongrong.cao <hongrong....@kyligence.io> AuthorDate: Tue Nov 14 18:26:48 2023 +0800 fix discarded failed jobs do not receive email notifications, etc. Co-authored-by: sibing.zhang <sibing.zh...@qq.com> remove extra import --- .../apache/kylin/rest/service/ProjectService.java | 7 +++--- .../kylin/common/mail/MailNotificationType.java | 2 +- .../{JOB_SUCCEED.ftl => JOB_FINISHED.ftl} | 18 +++++++++++++++ .../apache/kylin/common/KylinConfigBaseTest.java | 2 +- .../kylin/job/execution/AbstractExecutable.java | 2 +- .../kylin/job/execution/DefaultExecutable.java | 3 +-- .../org/apache/kylin/job/mail/JobMailUtil.java | 3 ++- .../kylin/job/execution/JobMailUtilTest.java | 10 ++++---- .../kylin/job/execution/JobStatusChangedTest.java | 6 ++--- .../org/apache/kylin/rest/service/JobService.java | 22 +++++++++++------- .../apache/kylin/rest/service/JobServiceTest.java | 27 ++++++++++++++++++---- .../rest/controller/NProjectControllerTest.java | 2 +- .../kylin/rest/service/ProjectServiceTest.java | 4 ++-- 13 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java index cfeb892afb..9bee1b813e 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -447,8 +447,6 @@ public class ProjectService extends BasicService { overrideKylinProps.computeIfPresent(KYLIN_SOURCE_JDBC_PASS_KEY, (k, v) -> HIDDEN_VALUE); } - /** Changing the order of assignments is not allowed - */ @Transaction(project = 0) public void updateJobNotificationConfig(String project, JobNotificationConfigRequest request) { aclEvaluate.checkProjectAdminPermission(project); @@ -457,10 +455,11 @@ public class ProjectService extends BasicService { overrideKylinProps.put("kylin.job.notification-enable-states", String.join(",", Sets.newHashSet(request.getJobStatesNotification()))); overrideKylinProps.put("kylin.job.notification-on-job-error", null); + } else { + overrideKylinProps.put("kylin.job.notification-on-job-error", + String.valueOf(request.getJobErrorNotificationEnabled())); } - overrideKylinProps.put("kylin.job.notification-on-job-error", - String.valueOf(request.getJobErrorNotificationEnabled())); overrideKylinProps.put("kylin.job.notification-on-empty-data-load", String.valueOf(request.getDataLoadEmptyNotificationEnabled())); overrideKylinProps.put("kylin.job.notification-admin-emails", diff --git a/src/core-common/src/main/java/org/apache/kylin/common/mail/MailNotificationType.java b/src/core-common/src/main/java/org/apache/kylin/common/mail/MailNotificationType.java index 9a4ea8cb96..03f3088c38 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/mail/MailNotificationType.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/mail/MailNotificationType.java @@ -41,7 +41,7 @@ public enum MailNotificationType { .anyMatch(item -> getCorrespondingJobStates().equalsIgnoreCase(item)); } }, - JOB_SUCCEED("Job Succeed", "JOB_SUCCEED", "succeed") { + JOB_FINISHED("Job Finished", "JOB_FINISHED", "finished") { @Override public boolean needNotify(KylinConfig kylinconfig) { return needNotifyStates(kylinconfig).stream() diff --git a/src/core-common/src/main/resources/mail_templates/JOB_SUCCEED.ftl b/src/core-common/src/main/resources/mail_templates/JOB_FINISHED.ftl similarity index 88% rename from src/core-common/src/main/resources/mail_templates/JOB_SUCCEED.ftl rename to src/core-common/src/main/resources/mail_templates/JOB_FINISHED.ftl index 888f67758f..0c3ca9a423 100644 --- a/src/core-common/src/main/resources/mail_templates/JOB_SUCCEED.ftl +++ b/src/core-common/src/main/resources/mail_templates/JOB_FINISHED.ftl @@ -93,6 +93,24 @@ ${submitter} </td> </tr> + <tr> + <th width="30%" style="padding: 8px; + line-height: 1.42857143; + vertical-align: top; + border: 1px solid #ddd; + text-align: left; + font-size: medium; + font-style: normal;">Project + </th> + <td style="padding: 8px; + line-height: 1.42857143; + vertical-align: top; + border: 1px solid #ddd; + font-size: medium; + font-style: normal;"> + ${project_name} + </td> + </tr> <tr> <th width="30%" style="padding: 8px; line-height: 1.42857143; diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index 230f322e9f..4efb53ade4 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -638,7 +638,7 @@ class KylinConfigBaseTest { new PropertiesEntity("kylin.job.notification-on-job-error", "false", false)); map.put("getJobNotificationStates", new PropertiesEntity("kylin.job.notification-enable-states", - "error,succeed,discarded", new String[] { "error", "succeed", "discarded" })); + "error,finished,discarded", new String[] { "error", "finished", "discarded" })); map.put("getStorageResourceSurvivalTimeThreshold", new PropertiesEntity("kylin.storage.resource-survival-time-threshold", "7d", 7L * 24 * 60 * 60 * 1000)); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 7c01ea8d3a..1c8df9666e 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -590,7 +590,7 @@ public abstract class AbstractExecutable implements Executable { return false; } - protected boolean notifyUser(MailNotificationType notificationType) { + public boolean notifyUser(MailNotificationType notificationType) { Preconditions.checkState((this instanceof DefaultExecutable) || this.getParent() instanceof DefaultExecutable); val projectConfig = NProjectManager.getInstance(getConfig()).getProject(project).getConfig(); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java index 2e22e7ff4f..081dce9c11 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/DefaultExecutable.java @@ -229,11 +229,10 @@ public class DefaultExecutable extends AbstractExecutable implements ChainedExec switch (state) { case SUCCEED: updateToFinalState(ExecutableState.SUCCEED, this::afterUpdateOutput, result.getShortErrMsg()); - onStatusChange(MailNotificationType.JOB_SUCCEED); + onStatusChange(MailNotificationType.JOB_FINISHED); break; case DISCARDED: updateToFinalState(ExecutableState.DISCARDED, this::onExecuteDiscardHook, result.getShortErrMsg()); - onStatusChange(MailNotificationType.JOB_DISCARDED); break; case SUICIDAL: updateToFinalState(ExecutableState.SUICIDAL, this::onExecuteSuicidalHook, result.getShortErrMsg()); diff --git a/src/core-job/src/main/java/org/apache/kylin/job/mail/JobMailUtil.java b/src/core-job/src/main/java/org/apache/kylin/job/mail/JobMailUtil.java index 45a019bf65..85bdd2d80d 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/mail/JobMailUtil.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/mail/JobMailUtil.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.mail; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.regex.Matcher; import org.apache.commons.lang3.StringUtils; @@ -55,7 +56,7 @@ public class JobMailUtil { data.put("job_name", executable.getName()); data.put("submitter", executable.getSubmitter()); data.put("project_name", executable.getProject()); - data.put("object", executable.getTargetSubject()); + data.put("object", Optional.ofNullable(executable.getTargetModelAlias()).orElse(executable.getTargetSubject())); data.put("start_time", DateFormat.formatToDateStr(executable.getStartTime(), DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS)); data.put("end_time", DateFormat.formatToDateStr(executable.getLastModified(), diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java index 726403a596..a144a08ece 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobMailUtilTest.java @@ -36,7 +36,7 @@ import lombok.val; public class JobMailUtilTest extends NLocalFileMetadataTestCase { private static final String DEFAULT_PROJECT = "default"; private static final String MAIL_TITLE_JOB_ERROR = "[Kylin System Notification]-[Job Error]"; - private static final String MAIL_TITLE_JOB_SUCCEED = "[Kylin System Notification]-[Job Succeed]"; + private static final String MAIL_TITLE_JOB_FINISHED = "[Kylin System Notification]-[Job Finished]"; private static final String MAIL_TITLE_JOB_DISCARDED = "[Kylin System Notification]-[Job Discarded]"; private static final String MAIL_TITLE_JOB_LOAD_EMPTY_DATA = "[Kylin System Notification]-[Job Load Empty Data]"; @@ -68,10 +68,10 @@ public class JobMailUtilTest extends NLocalFileMetadataTestCase { Assert.assertNotNull(mail.getSecond()); Assert.assertEquals(MAIL_TITLE_JOB_LOAD_EMPTY_DATA, mail.getFirst()); - // test job succeed - mail = JobMailUtil.createMail(MailNotificationType.JOB_SUCCEED, job); + // test job finished + mail = JobMailUtil.createMail(MailNotificationType.JOB_FINISHED, job); Assert.assertNotNull(mail.getSecond()); - Assert.assertEquals(MAIL_TITLE_JOB_SUCCEED, mail.getFirst()); + Assert.assertEquals(MAIL_TITLE_JOB_FINISHED, mail.getFirst()); // test job discarded mail = JobMailUtil.createMail(MailNotificationType.JOB_DISCARDED, job); @@ -81,7 +81,7 @@ public class JobMailUtilTest extends NLocalFileMetadataTestCase { // test create mail failed DefaultExecutableOnModel job2 = new DefaultExecutableOnModel(); job2.setProject(DEFAULT_PROJECT); - mail = JobMailUtil.createMail(MailNotificationType.JOB_SUCCEED, job2); + mail = JobMailUtil.createMail(MailNotificationType.JOB_FINISHED, job2); Assert.assertNull(mail); } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobStatusChangedTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobStatusChangedTest.java index b921ea66c4..9d820a716a 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/JobStatusChangedTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/JobStatusChangedTest.java @@ -111,11 +111,11 @@ public class JobStatusChangedTest extends LogOutputTestCase { overwriteSystemProp("kylin.job.notification-enabled", "true"); // test job state needs to be notified, but it is not configured - notified = job.onStatusChange(MailNotificationType.JOB_SUCCEED); + notified = job.onStatusChange(MailNotificationType.JOB_FINISHED); Assert.assertFalse(notified); - overwriteSystemProp("kylin.job.notification-enable-states", "ERROR,DISCARDED,SUCCEED"); - notified = job.onStatusChange(MailNotificationType.JOB_SUCCEED); + overwriteSystemProp("kylin.job.notification-enable-states", "ERROR,DISCARDED,FINISHED"); + notified = job.onStatusChange(MailNotificationType.JOB_FINISHED); Assert.assertTrue(containsLog("user list is empty, not need to notify users.")); Assert.assertFalse(notified); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index ca2664ec31..71d18e1771 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -63,6 +63,7 @@ import org.apache.kylin.common.exception.JobExceptionReason; import org.apache.kylin.common.exception.JobExceptionResolve; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.logging.SetLogCategory; +import org.apache.kylin.common.mail.MailNotificationType; import org.apache.kylin.common.metrics.MetricsCategory; import org.apache.kylin.common.metrics.MetricsGroup; import org.apache.kylin.common.metrics.MetricsName; @@ -77,6 +78,11 @@ import org.apache.kylin.common.scheduler.JobReadyNotifier; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StringHelper; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.common.JobUtil; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; @@ -134,11 +140,6 @@ import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; import io.kyligence.kap.secondstorage.SecondStorageUtil; import lombok.Getter; @@ -542,6 +543,10 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl return; } getManager(NExecutableManager.class, project).discardJob(job.getId()); + + if (getConfig().isMailEnabled()) { + job.notifyUser(MailNotificationType.JOB_DISCARDED); + } } /** @@ -831,8 +836,8 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl segmentSubStages.setStepRatio(stepRatio); // Put warning message into segment_sub_stages.info if exists - Optional<ExecutableStepResponse> warningStageRes = stageResponses.stream().filter(stageRes -> - stageRes.getStatus() == JobStatusEnum.WARNING).findFirst(); + Optional<ExecutableStepResponse> warningStageRes = stageResponses.stream() + .filter(stageRes -> stageRes.getStatus() == JobStatusEnum.WARNING).findFirst(); warningStageRes.ifPresent(res -> segmentSubStages.getInfo().put(NBatchConstants.P_WARNING_CODE, res.getInfo().getOrDefault(NBatchConstants.P_WARNING_CODE, null))); } @@ -850,7 +855,8 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl */ Set<JobStatusEnum> jobStatusEnums = Sets.newHashSet(JobStatusEnum.ERROR, JobStatusEnum.STOPPED, JobStatusEnum.DISCARDED); - Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP, JobStatusEnum.WARNING); + Set<JobStatusEnum> jobFinishOrSkip = Sets.newHashSet(JobStatusEnum.FINISHED, JobStatusEnum.SKIP, + JobStatusEnum.WARNING); if (oldResponse.getStatus() != newResponse.getStatus() && !jobStatusEnums.contains(oldResponse.getStatus())) { if (jobStatusEnums.contains(newResponse.getStatus())) { diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 3d82dc7e45..7e076453fc 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -72,10 +72,14 @@ import org.apache.kylin.common.persistence.transaction.TransactionException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.LogOutputTestCase; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.engine.spark.job.NSparkExecutable; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.constant.JobActionEnum; import org.apache.kylin.job.constant.JobStatusEnum; @@ -146,10 +150,6 @@ import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.test.util.ReflectionTestUtils; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - //import io.kyligence.kap.clickhouse.MockSecondStorage; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.engine.spark.job.NSparkSnapshotJob; @@ -159,7 +159,7 @@ import org.apache.kylin.engine.spark.job.step.NStageForBuild; import lombok.val; import lombok.var; -public class JobServiceTest extends NLocalFileMetadataTestCase { +public class JobServiceTest extends LogOutputTestCase { String project = "default"; String yarnAppId = "application_1554187389076_9296"; @@ -1469,6 +1469,23 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { } } + @Test + public void testDiscardJobAndNotify() { + NExecutableManager manager = NExecutableManager.getInstance(getTestConfig(), project); + val job = new DefaultExecutable(); + job.setProject(project); + manager.addJob(job); + + overwriteSystemProp("kylin.job.notification-enabled", "true"); + + UnitOfWork.doInTransactionWithRetry(() -> { + jobService.updateJobStatus(job.getId(), project, "DISCARD"); + return null; + }, project); + + Assert.assertTrue(containsLog("[Job Discarded] is not specified by user, not need to notify users.")); + } + @Test public void testCheckJobStatus() { jobService.checkJobStatus(Lists.newArrayList("RUNNING")); diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java index da3f22c1b5..12f047236e 100644 --- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java +++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java @@ -244,7 +244,7 @@ public class NProjectControllerTest extends NLocalFileMetadataTestCase { public void testUpdateJobNotificationConfig() throws Exception { val request = new JobNotificationConfigRequest(); - request.setJobStatesNotification(Arrays.asList("Succeed", "Error", "Discard")); + request.setJobStatesNotification(Arrays.asList("finished", "error", "discarded")); request.setDataLoadEmptyNotificationEnabled(true); request.setJobNotificationEmails(Arrays.asList("f...@g.com")); diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 132b7b0acf..10e665c3c4 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -369,7 +369,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { var response = projectService.getProjectConfig(project); val jobNotificationConfigRequest = new JobNotificationConfigRequest(); jobNotificationConfigRequest.setDataLoadEmptyNotificationEnabled(false); - jobNotificationConfigRequest.setJobStatesNotification(Lists.newArrayList("Succeed", "Error", "Discard")); + jobNotificationConfigRequest.setJobStatesNotification(Lists.newArrayList("finished", "error", "discarded")); jobNotificationConfigRequest.setJobNotificationEmails( Lists.newArrayList("us...@kylin.io", "us...@kylin.io", "us...@kylin.io")); projectService.updateJobNotificationConfig(project, jobNotificationConfigRequest); @@ -772,7 +772,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { val jobNotificationConfigRequest = new JobNotificationConfigRequest(); jobNotificationConfigRequest.setDataLoadEmptyNotificationEnabled(true); - jobNotificationConfigRequest.setJobStatesNotification(Lists.newArrayList("Succeed", "Error", "Discard")); + jobNotificationConfigRequest.setJobStatesNotification(Lists.newArrayList("finished", "error", "discarded")); jobNotificationConfigRequest.setJobNotificationEmails( Lists.newArrayList("us...@kylin.io", "us...@kylin.io", "us...@kylin.io")); projectService.updateJobNotificationConfig(PROJECT, jobNotificationConfigRequest);