This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 56cf4e6ee59358a2dd2a325d4e086b04049e9ad4
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Tue Aug 25 18:58:22 2020 +0800

    KYLIN-4698 Delete segment storage path after merging segment, deleting 
segment and droping cube
---
 .../apache/kylin/metadata/MetadataConstants.java   |  1 -
 .../engine/spark/metadata/cube/PathManager.java    |  8 ++++++--
 .../kylin/engine/spark/job/JobStepFactory.java     |  9 --------
 .../NSparkUpdateMetaAndCleanupAfterMergeStep.java  | 15 +++++++++-----
 .../engine/spark/LocalWithSparkSessionTest.java    | 13 +++++++++++-
 .../kylin/engine/spark/job/JobStepFactoryTest.java |  8 +-------
 .../org/apache/kylin/rest/service/CubeService.java | 24 ++++++++++------------
 7 files changed, 40 insertions(+), 38 deletions(-)

diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index 57801b6..36d20d2 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -39,7 +39,6 @@ public interface MetadataConstants {
     String P_CUBE_ID = "cubeId";
     String P_CUBE_NAME = "cubeName";
     String P_SEGMENT_IDS = "segmentIds";
-    String P_SEGMENT_NAMES = "segmentNames";
     String P_JOB_ID = "jobId";
     String P_JOB_TYPE = "jobType";
     String P_CLASS_NAME = "className";
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index ef51532..5353523 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -31,12 +31,16 @@ public final class PathManager {
 
     public static String getParquetStoragePath(KylinConfig config, String 
cubeName, String segName, String identifier, String cuboidId) {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        String hdfsWorkDir = config.getHdfsWorkingDirectory(cube.getProject());
-        return hdfsWorkDir + "parquet" + File.separator + cubeName + 
File.separator + segName + "_" + identifier + File.separator + cuboidId;
+        return getParquetStoragePath(cube, segName, identifier, 
Long.parseLong(cuboidId));
     }
 
     public static String getParquetStoragePath(CubeInstance cube, String 
segName, String identifier, Long cuboidId) {
         String hdfsWorkDir = 
cube.getConfig().getHdfsWorkingDirectory(cube.getProject());
         return hdfsWorkDir + "parquet" + File.separator + cube.getName() + 
File.separator + segName + "_" + identifier + File.separator + cuboidId;
     }
+
+    public static String getSegmentParquetStoragePath(CubeInstance cube, 
String segName, String identifier) {
+        String hdfsWorkDir = 
cube.getConfig().getHdfsWorkingDirectory(cube.getProject());
+        return hdfsWorkDir + "parquet" + File.separator + cube.getName() + 
File.separator + segName + "_" + identifier;
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
index ebfaaf0..96de267 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
@@ -55,15 +55,6 @@ public class JobStepFactory {
         step.setParams(parent.getParams());
         step.setProject(parent.getProject());
         step.setTargetSubject(parent.getTargetSubject());
-        if (step instanceof NSparkUpdateMetaAndCleanupAfterMergeStep) {
-            CubeSegment mergeSegment = 
cube.getSegmentById(parent.getTargetSegments().iterator().next());
-            final Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(mergeSegment);
-            step.setParam(MetadataConstants.P_SEGMENT_NAMES,
-                    String.join(",", 
NSparkCubingUtil.toSegmentNames(mergingSegments)));
-            step.setParam(CubingExecutableUtil.SEGMENT_ID, 
parent.getParam(CubingExecutableUtil.SEGMENT_ID));
-            step.setParam(MetadataConstants.P_JOB_TYPE, 
parent.getParam(MetadataConstants.P_JOB_TYPE));
-            step.setParam(MetadataConstants.P_OUTPUT_META_URL, 
parent.getParam(MetadataConstants.P_OUTPUT_META_URL));
-        }
         parent.addTask(step);
         //after addTask, step's id is changed
         step.setDistMetaUrl(config.getJobTmpMetaStoreUrl(parent.getProject(), 
step.getId()));
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index dc0c982..e0a6704 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -27,14 +27,16 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.ExecutableContext;
-
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.Segments;
 
 public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable 
{
     public NSparkUpdateMetaAndCleanupAfterMergeStep() {
@@ -44,18 +46,21 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep 
extends NSparkExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
         String cubeId = getParam(MetadataConstants.P_CUBE_ID);
-        String[] segments = 
StringUtils.split(getParam(MetadataConstants.P_SEGMENT_NAMES), ",");
+        String mergedSegmentUuid = getParam(CubingExecutableUtil.SEGMENT_ID);
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         CubeInstance cube = 
CubeManager.getInstance(config).getCubeByUuid(cubeId);
 
         updateMetadataAfterMerge(cubeId);
 
-        for (String segmentName : segments) {
-            String path = config.getHdfsWorkingDirectory() + cube.getProject() 
+ "/parquet/" + cube.getName() + "/" + segmentName;
+        CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
+        Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(mergedSegment);
+        for (CubeSegment segment : mergingSegments) {
+            String path = PathManager.getSegmentParquetStoragePath(cube, 
segment.getName(),
+                    segment.getStorageLocationIdentifier());
             try {
                 HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), 
new Path(path));
             } catch (IOException e) {
-                throw new ExecuteException("Can not delete segment: " + 
segmentName + ", in cube: " + cube.getName());
+                throw new ExecuteException("Can not delete segment: " + 
segment.getName() + ", in cube: " + cube.getName());
             }
         }
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index 2b3f480..2480a6e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.spark;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.TempMetadataBuilder;
 import org.apache.kylin.cube.CubeInstance;
@@ -35,6 +37,7 @@ import org.apache.kylin.engine.spark.job.NSparkCubingStep;
 import org.apache.kylin.engine.spark.job.NSparkMergingJob;
 import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.metadata.MetadataConverter;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -46,6 +49,7 @@ import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -191,7 +195,14 @@ public class LocalWithSparkSessionTest extends 
LocalFileMetadataTestCase impleme
         CubeSegment mergeSegment = cubeMgr.mergeSegments(cube, new 
SegmentRange.TSRange(start, end), null, force);
         NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment,  
"ADMIN");
         execMgr.addJob(mergeJob);
-        return wait(mergeJob);
+        ExecutableState result = wait(mergeJob);
+        Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(mergeSegment);
+        for (CubeSegment segment : mergingSegments) {
+            String path = PathManager.getSegmentParquetStoragePath(cube, 
segment.getName(),
+                    segment.getStorageLocationIdentifier());
+            Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new 
Path(HadoopUtil.makeURI(path))));
+        }
+        return result;
     }
 
     protected void fullBuildCube(String cubeName) throws Exception {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
index b29b09b..14a424f 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
@@ -131,13 +131,7 @@ public class JobStepFactoryTest extends 
LocalWithSparkSessionTest {
         CubeInstance cubeInstance = cubeMgr.reloadCube(CUBE_NAME);
         NSparkUpdateMetaAndCleanupAfterMergeStep cleanStep = 
job.getCleanUpAfterMergeStep();
         job.getParams().forEach((key, value) -> {
-            if (key.equalsIgnoreCase(MetadataConstants.P_SEGMENT_IDS)) {
-                final List<String> needDeleteSegmentNames = 
cubeInstance.getMergingSegments(mergedSegment).stream()
-                        
.map(CubeSegment::getName).collect(Collectors.toList());
-                Assert.assertEquals(needDeleteSegmentNames, 
Arrays.asList(cleanStep.getParam(MetadataConstants.P_SEGMENT_NAMES).split(",")));
-            } else {
-                Assert.assertEquals(value, mergeStep.getParam(key));
-            }
+            Assert.assertEquals(value, mergeStep.getParam(key));
         });
         Assert.assertEquals(config.getJobTmpMetaStoreUrl(getProject(), 
cleanStep.getId()).toString(),
                 cleanStep.getDistMetaUrl());
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index c3d60b7..2aac66e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,10 +29,12 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -44,9 +46,9 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.CubeJobLockUtil;
 import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.JobStatusEnum;
@@ -360,7 +362,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
         int cubeNum = 
getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size();
         getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete 
cube desc when no other cube is using it
 
-        cleanSegmentStorage(toRemoveSegs);
+        cleanSegmentStorage(cube, toRemoveSegs);
     }
 
     /**
@@ -623,28 +625,24 @@ public class CubeService extends BasicService implements 
InitializingBean {
 
         CubeInstance cubeInstance = 
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
 
-        cleanSegmentStorage(Collections.singletonList(toDelete));
+        cleanSegmentStorage(cubeInstance, Collections.singletonList(toDelete));
 
         return cubeInstance;
     }
 
     // clean segment data in hbase and hdfs
-    private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws 
IOException {
+    private void cleanSegmentStorage(CubeInstance cube, List<CubeSegment> 
toRemoveSegs) throws IOException {
         if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) 
{
             return;
         }
 
         if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
-            List<String> toDropHTables = 
Lists.newArrayListWithCapacity(toRemoveSegs.size());
-            List<String> toDelHDFSPaths = 
Lists.newArrayListWithCapacity(toRemoveSegs.size());
             for (CubeSegment seg : toRemoveSegs) {
-                toDropHTables.add(seg.getStorageLocationIdentifier());
-                
toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(),
-                        seg.getLastBuildJobID()));
+                String path = PathManager.getSegmentParquetStoragePath(cube, 
seg.getName(),
+                        seg.getStorageLocationIdentifier());
+                logger.info("Deleting segment HDFS path {}", path);
+                HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), 
new Path(path));
             }
-
-//            StorageCleanUtil.dropHTables(new 
HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
-//            
StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), 
toDelHDFSPaths);
         }
     }
 
@@ -702,7 +700,7 @@ public class CubeService extends BasicService implements 
InitializingBean {
         // remove from metadata
         getCubeManager().clearSegments(cube);
 
-        cleanSegmentStorage(toRemoveSegs);
+        cleanSegmentStorage(cube, toRemoveSegs);
     }
 
     public void updateOnNewSegmentReady(String cubeName) {

Reply via email to