This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 33c068f KYLIN-4766 Cleanup segment storage after job be discarded (#1716) 33c068f is described below commit 33c068fd66a028a174a15acb28bbca8394903f28 Author: Yaqian Zhang <598593...@qq.com> AuthorDate: Sat Aug 7 13:20:55 2021 +0800 KYLIN-4766 Cleanup segment storage after job be discarded (#1716) * KYLIN-4766 Cleanup segment storage after job be discarded * fix --- .../org/apache/kylin/engine/spark/metadata/cube/PathManager.java | 7 +++---- .../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 6 ++---- .../spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java | 2 +- .../src/main/java/org/apache/kylin/rest/service/CubeService.java | 2 +- .../src/main/java/org/apache/kylin/rest/service/JobService.java | 8 +++++--- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java index 6444715..0d34451 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java @@ -58,12 +58,11 @@ public final class PathManager { /** * Delete segment path */ - public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, CubeSegment segment) throws IOException { - if (cube == null || segment == null) { + public static boolean deleteSegmentParquetStoragePath(CubeInstance cube, String segmentName, String identifier) throws IOException { + if (cube == null || StringUtils.isNoneBlank(segmentName)|| StringUtils.isNoneBlank(identifier)) { return false; } - String path = getSegmentParquetStoragePath(cube, segment.getName(), - segment.getStorageLocationIdentifier()); + String path = getSegmentParquetStoragePath(cube, segmentName, identifier); logger.info("Deleting segment parquet path {}", path); HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); return true; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index 886e476..1b196ba 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,15 +142,14 @@ public class NSparkCubingJob extends CubingJob { this.cube = cube; } - public void cleanupAfterJobDiscard() { + public void cleanupAfterJobDiscard(String segmentName, String segmentIdentifier) { try { PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), getParam(MetadataConstants.P_JOB_ID)); CubeManager cubeManager = CubeManager.getInstance(getConfig()); CubeInstance cube = cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME)); - CubeSegment segment = cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME), SegmentStatusEnum.NEW); - PathManager.deleteSegmentParquetStoragePath(cube, segment); + PathManager.deleteSegmentParquetStoragePath(cube, segmentName, segmentIdentifier); } catch (IOException e) { logger.warn("Delete resource file failed after job be discarded, due to", e); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java index d01f714..8c12f4e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java @@ -59,7 +59,7 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { // delete segments which were merged for (CubeSegment segment : mergingSegments) { try { - PathManager.deleteSegmentParquetStoragePath(cube, segment); + PathManager.deleteSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()); } catch (IOException e) { throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName()); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 6fcdc75..4fdbf1f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -669,7 +669,7 @@ public class CubeService extends BasicService implements InitializingBean { if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) { for (CubeSegment segment : toRemoveSegs) { - PathManager.deleteSegmentParquetStoragePath(cube, segment); + PathManager.deleteSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()); } } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index d5b926f..eb28958 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -688,12 +688,14 @@ public class JobService extends BasicService implements InitializingBean { if (job.getStatus() != JobStatusEnum.DISCARDED) { if (executable instanceof CubingJob) { + String segmentName = job.getRelatedSegmentName(); + CubeSegment segment = getCubeManager().getCube(job.getRelatedCube()).getSegment(segmentName, SegmentStatusEnum.NEW); + String segmentIdentifier = segment.getStorageLocationIdentifier(); + cancelCubingJobInner((CubingJob) executable); //Clean up job tmp and segment storage from hdfs after job be discarded if (executable instanceof NSparkCubingJob) { - ((NSparkCubingJob) executable).cleanupAfterJobDiscard(); + ((NSparkCubingJob) executable).cleanupAfterJobDiscard(segmentName, segmentIdentifier); } - - cancelCubingJobInner((CubingJob) executable); //release global mr hive dict lock if exists if (executable.getStatus().isFinalState()) { try {