This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 33c068f  KYLIN-4766 Cleanup segment storage after job be discarded 
(#1716)
33c068f is described below

commit 33c068fd66a028a174a15acb28bbca8394903f28
Author: Yaqian Zhang <598593...@qq.com>
AuthorDate: Sat Aug 7 13:20:55 2021 +0800

    KYLIN-4766 Cleanup segment storage after job be discarded (#1716)
    
    * KYLIN-4766 Cleanup segment storage after job be discarded
    
    * fix
---
 .../org/apache/kylin/engine/spark/metadata/cube/PathManager.java  | 7 +++----
 .../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java   | 6 ++----
 .../spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java       | 2 +-
 .../src/main/java/org/apache/kylin/rest/service/CubeService.java  | 2 +-
 .../src/main/java/org/apache/kylin/rest/service/JobService.java   | 8 +++++---
 5 files changed, 12 insertions(+), 13 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 6444715..0d34451 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -58,12 +58,11 @@ public final class PathManager {
     /**
      * Delete segment path
      */
-    public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, 
CubeSegment segment) throws IOException {
-        if (cube == null || segment == null) {
+    public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, 
String segmentName, String identifier) throws IOException {
+        if (cube == null || StringUtils.isNoneBlank(segmentName)|| 
StringUtils.isNoneBlank(identifier)) {
             return false;
         }
-        String path = getSegmentParquetStoragePath(cube, segment.getName(),
-                segment.getStorageLocationIdentifier());
+        String path = getSegmentParquetStoragePath(cube, segmentName, 
identifier);
         logger.info("Deleting segment parquet path {}", path);
         HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new 
Path(path));
         return true;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index 886e476..1b196ba 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,15 +142,14 @@ public class NSparkCubingJob extends CubingJob {
         this.cube = cube;
     }
 
-    public void cleanupAfterJobDiscard() {
+    public void cleanupAfterJobDiscard(String segmentName, String 
segmentIdentifier) {
         try {
             PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
                     getParam(MetadataConstants.P_JOB_ID));
 
             CubeManager cubeManager = CubeManager.getInstance(getConfig());
             CubeInstance cube = 
cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME));
-            CubeSegment segment = 
cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME), 
SegmentStatusEnum.NEW);
-            PathManager.deleteSegmentParquetStoragePath(cube, segment);
+            PathManager.deleteSegmentParquetStoragePath(cube, segmentName, 
segmentIdentifier);
         } catch (IOException e) {
             logger.warn("Delete resource file failed after job be discarded, 
due to", e);
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index d01f714..8c12f4e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -59,7 +59,7 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends 
NSparkExecutable {
             // delete segments which were merged
             for (CubeSegment segment : mergingSegments) {
                 try {
-                    PathManager.deleteSegmentParquetStoragePath(cube, segment);
+                    PathManager.deleteSegmentParquetStoragePath(cube, 
segment.getName(), segment.getStorageLocationIdentifier());
                 } catch (IOException e) {
                     throw new ExecuteException("Can not delete segment: " + 
segment.getName() + ", in cube: " + cube.getName());
                 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 6fcdc75..4fdbf1f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -669,7 +669,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
 
         if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
             for (CubeSegment segment : toRemoveSegs) {
-                PathManager.deleteSegmentParquetStoragePath(cube, segment);
+                PathManager.deleteSegmentParquetStoragePath(cube, 
segment.getName(), segment.getStorageLocationIdentifier());
             }
         }
     }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index d5b926f..eb28958 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -688,12 +688,14 @@ public class JobService extends BasicService implements 
InitializingBean {
 
         if (job.getStatus() != JobStatusEnum.DISCARDED) {
             if (executable instanceof CubingJob) {
+                String segmentName = job.getRelatedSegmentName();
+                CubeSegment segment = 
getCubeManager().getCube(job.getRelatedCube()).getSegment(segmentName, 
SegmentStatusEnum.NEW);
+                String segmentIdentifier = 
segment.getStorageLocationIdentifier();
+                cancelCubingJobInner((CubingJob) executable);
                 //Clean up job tmp and segment storage from hdfs after job be 
discarded
                 if (executable instanceof NSparkCubingJob) {
-                    ((NSparkCubingJob) executable).cleanupAfterJobDiscard();
+                    ((NSparkCubingJob) 
executable).cleanupAfterJobDiscard(segmentName, segmentIdentifier);
                 }
-
-                cancelCubingJobInner((CubingJob) executable);
                 //release global mr hive dict lock if exists
                 if (executable.getStatus().isFinalState()) {
                     try {

Reply via email to