This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 56cf4e6ee59358a2dd2a325d4e086b04049e9ad4 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Tue Aug 25 18:58:22 2020 +0800 KYLIN-4698 Delete segment storage path after merging segment, deleting segment and droping cube --- .../apache/kylin/metadata/MetadataConstants.java | 1 - .../engine/spark/metadata/cube/PathManager.java | 8 ++++++-- .../kylin/engine/spark/job/JobStepFactory.java | 9 -------- .../NSparkUpdateMetaAndCleanupAfterMergeStep.java | 15 +++++++++----- .../engine/spark/LocalWithSparkSessionTest.java | 13 +++++++++++- .../kylin/engine/spark/job/JobStepFactoryTest.java | 8 +------- .../org/apache/kylin/rest/service/CubeService.java | 24 ++++++++++------------ 7 files changed, 40 insertions(+), 38 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java index 57801b6..36d20d2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java @@ -39,7 +39,6 @@ public interface MetadataConstants { String P_CUBE_ID = "cubeId"; String P_CUBE_NAME = "cubeName"; String P_SEGMENT_IDS = "segmentIds"; - String P_SEGMENT_NAMES = "segmentNames"; String P_JOB_ID = "jobId"; String P_JOB_TYPE = "jobType"; String P_CLASS_NAME = "className"; diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java index ef51532..5353523 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java @@ -31,12 +31,16 @@ public final class PathManager { public static String getParquetStoragePath(KylinConfig config, String cubeName, String segName, String identifier, String cuboidId) { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - String hdfsWorkDir = config.getHdfsWorkingDirectory(cube.getProject()); - return hdfsWorkDir + "parquet" + File.separator + cubeName + File.separator + segName + "_" + identifier + File.separator + cuboidId; + return getParquetStoragePath(cube, segName, identifier, Long.parseLong(cuboidId)); } public static String getParquetStoragePath(CubeInstance cube, String segName, String identifier, Long cuboidId) { String hdfsWorkDir = cube.getConfig().getHdfsWorkingDirectory(cube.getProject()); return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier + File.separator + cuboidId; } + + public static String getSegmentParquetStoragePath(CubeInstance cube, String segName, String identifier) { + String hdfsWorkDir = cube.getConfig().getHdfsWorkingDirectory(cube.getProject()); + return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier; + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java index ebfaaf0..96de267 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java @@ -55,15 +55,6 @@ public class JobStepFactory { step.setParams(parent.getParams()); step.setProject(parent.getProject()); step.setTargetSubject(parent.getTargetSubject()); - if (step instanceof NSparkUpdateMetaAndCleanupAfterMergeStep) { - CubeSegment mergeSegment = cube.getSegmentById(parent.getTargetSegments().iterator().next()); - final Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment); - step.setParam(MetadataConstants.P_SEGMENT_NAMES, - String.join(",", NSparkCubingUtil.toSegmentNames(mergingSegments))); - step.setParam(CubingExecutableUtil.SEGMENT_ID, parent.getParam(CubingExecutableUtil.SEGMENT_ID)); - step.setParam(MetadataConstants.P_JOB_TYPE, parent.getParam(MetadataConstants.P_JOB_TYPE)); - step.setParam(MetadataConstants.P_OUTPUT_META_URL, parent.getParam(MetadataConstants.P_OUTPUT_META_URL)); - } parent.addTask(step); //after addTask, step's id is changed step.setDistMetaUrl(config.getJobTmpMetaStoreUrl(parent.getProject(), step.getId())); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java index dc0c982..e0a6704 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java @@ -27,14 +27,16 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.ExecutableContext; - import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.Segments; public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { public NSparkUpdateMetaAndCleanupAfterMergeStep() { @@ -44,18 +46,21 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { String cubeId = getParam(MetadataConstants.P_CUBE_ID); - String[] segments = StringUtils.split(getParam(MetadataConstants.P_SEGMENT_NAMES), ","); + String mergedSegmentUuid = getParam(CubingExecutableUtil.SEGMENT_ID); KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeInstance cube = CubeManager.getInstance(config).getCubeByUuid(cubeId); updateMetadataAfterMerge(cubeId); - for (String segmentName : segments) { - String path = config.getHdfsWorkingDirectory() + cube.getProject() + "/parquet/" + cube.getName() + "/" + segmentName; + CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid); + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment); + for (CubeSegment segment : mergingSegments) { + String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), + segment.getStorageLocationIdentifier()); try { HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); } catch (IOException e) { - throw new ExecuteException("Can not delete segment: " + segmentName + ", in cube: " + cube.getName()); + throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName()); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index 2b3f480..2480a6e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -20,10 +20,12 @@ package org.apache.kylin.engine.spark; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Shell; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.TempMetadataBuilder; import org.apache.kylin.cube.CubeInstance; @@ -35,6 +37,7 @@ import org.apache.kylin.engine.spark.job.NSparkCubingStep; import org.apache.kylin.engine.spark.job.NSparkMergingJob; import org.apache.kylin.engine.spark.job.UdfManager; import org.apache.kylin.engine.spark.metadata.MetadataConverter; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -46,6 +49,7 @@ import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -191,7 +195,14 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new SegmentRange.TSRange(start, end), null, force); NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN"); execMgr.addJob(mergeJob); - return wait(mergeJob); + ExecutableState result = wait(mergeJob); + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment); + for (CubeSegment segment : mergingSegments) { + String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), + segment.getStorageLocationIdentifier()); + Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path)))); + } + return result; } protected void fullBuildCube(String cubeName) throws Exception { diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java index b29b09b..14a424f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java @@ -131,13 +131,7 @@ public class JobStepFactoryTest extends LocalWithSparkSessionTest { CubeInstance cubeInstance = cubeMgr.reloadCube(CUBE_NAME); NSparkUpdateMetaAndCleanupAfterMergeStep cleanStep = job.getCleanUpAfterMergeStep(); job.getParams().forEach((key, value) -> { - if (key.equalsIgnoreCase(MetadataConstants.P_SEGMENT_IDS)) { - final List<String> needDeleteSegmentNames = cubeInstance.getMergingSegments(mergedSegment).stream() - .map(CubeSegment::getName).collect(Collectors.toList()); - Assert.assertEquals(needDeleteSegmentNames, Arrays.asList(cleanStep.getParam(MetadataConstants.P_SEGMENT_NAMES).split(","))); - } else { - Assert.assertEquals(value, mergeStep.getParam(key)); - } + Assert.assertEquals(value, mergeStep.getParam(key)); }); Assert.assertEquals(config.getJobTmpMetaStoreUrl(getProject(), cleanStep.getId()).toString(), cleanStep.getDistMetaUrl()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index c3d60b7..2aac66e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,10 +29,12 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -44,9 +46,9 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.CubeJobLockUtil; import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.constant.JobStatusEnum; @@ -360,7 +362,7 @@ public class CubeService extends BasicService implements InitializingBean { int cubeNum = getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size(); getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete cube desc when no other cube is using it - cleanSegmentStorage(toRemoveSegs); + cleanSegmentStorage(cube, toRemoveSegs); } /** @@ -623,28 +625,24 @@ public class CubeService extends BasicService implements InitializingBean { CubeInstance cubeInstance = CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete); - cleanSegmentStorage(Collections.singletonList(toDelete)); + cleanSegmentStorage(cubeInstance, Collections.singletonList(toDelete)); return cubeInstance; } // clean segment data in hbase and hdfs - private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws IOException { + private void cleanSegmentStorage(CubeInstance cube, List<CubeSegment> toRemoveSegs) throws IOException { if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) { return; } if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) { - List<String> toDropHTables = Lists.newArrayListWithCapacity(toRemoveSegs.size()); - List<String> toDelHDFSPaths = Lists.newArrayListWithCapacity(toRemoveSegs.size()); for (CubeSegment seg : toRemoveSegs) { - toDropHTables.add(seg.getStorageLocationIdentifier()); - toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(), - seg.getLastBuildJobID())); + String path = PathManager.getSegmentParquetStoragePath(cube, seg.getName(), + seg.getStorageLocationIdentifier()); + logger.info("Deleting segment HDFS path {}", path); + HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path(path)); } - -// StorageCleanUtil.dropHTables(new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables); -// StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), toDelHDFSPaths); } } @@ -702,7 +700,7 @@ public class CubeService extends BasicService implements InitializingBean { // remove from metadata getCubeManager().clearSegments(cube); - cleanSegmentStorage(toRemoveSegs); + cleanSegmentStorage(cube, toRemoveSegs); } public void updateOnNewSegmentReady(String cubeName) {