This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 92b77fdaa8 KYLIN-5188, fix email notification after cubing and merging job 92b77fdaa8 is described below commit 92b77fdaa819844519b2baa019472bed0edc3819 Author: Mukvin <boyboys...@163.com> AuthorDate: Wed Jun 1 12:01:08 2022 +0800 KYLIN-5188, fix email notification after cubing and merging job --- .../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 6 +++++- .../java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java | 2 ++ .../java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index 1b196bab58..abb7f658fb 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -98,7 +98,8 @@ public class NSparkCubingJob extends CubingJob { job.setParam(MetadataConstants.P_DATA_RANGE_END, String.valueOf(endTime)); job.setParam(MetadataConstants.P_OUTPUT_META_URL, job.cube.getConfig().getMetadataUrl().toString()); job.setParam(MetadataConstants.P_CUBOID_NUMBER, String.valueOf(job.cube.getDescriptor().getAllCuboids().size())); - + job.setDeployEnvName(KylinConfig.getInstanceFromEnv().getDeployEnv()); + job.setNotifyList(job.cube.getDescriptor().getNotifyList()); //set param for job metrics job.setParam(MetadataConstants.P_JOB_TYPE, jobType.toString()); JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, job.cube); @@ -114,14 +115,17 @@ public class NSparkCubingJob extends CubingJob { return MetaDumpUtil.collectCubeMetadata(cubeInstance); } + @Override public String getDeployEnvName() { return getParam(DEPLOY_ENV_NAME); } + @Override public long findSourceRecordCount() { return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0")); } + @Override public long getMapReduceWaitTime() { return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java index 9a5306733c..2e6f79411f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java @@ -84,6 +84,8 @@ public class NSparkMergingJob extends CubingJob { job.setParam(MetadataConstants.P_OUTPUT_META_URL, cube.getConfig().getMetadataUrl().toString()); job.setParam(MetadataConstants.P_JOB_TYPE, String.valueOf(jobType)); job.setParam(MetadataConstants.P_CUBOID_NUMBER, String.valueOf(cube.getDescriptor().getAllCuboids().size())); + job.setDeployEnvName(KylinConfig.getInstanceFromEnv().getDeployEnv()); + job.setNotifyList(cube.getDescriptor().getNotifyList()); JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube); JobStepFactory.addStep(job, JobStepType.MERGING, cube); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java index f09ea04180..04096e6505 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java @@ -82,6 +82,8 @@ public class NSparkOptimizingJob extends CubingJob { job.setParam(MetadataConstants.P_OUTPUT_META_URL, cube.getConfig().getMetadataUrl().toString()); job.setParam(MetadataConstants.P_JOB_TYPE, String.valueOf(jobType)); job.setParam(MetadataConstants.P_CUBOID_NUMBER, String.valueOf(cube.getDescriptor().getAllCuboids().size())); + job.setDeployEnvName(KylinConfig.getInstanceFromEnv().getDeployEnv()); + job.setNotifyList(cube.getDescriptor().getNotifyList()); // Phase 1: Prepare base cuboid data from old segment JobStepFactory.addStep(job, JobStepType.FILTER_RECOMMEND_CUBOID, cube); @@ -102,6 +104,7 @@ public class NSparkOptimizingJob extends CubingJob { return MetaDumpUtil.collectCubeMetadata(cubeInstance); } + @Override public String getDeployEnvName() { return getParam(DEPLOY_ENV_NAME); }