This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 62d5b03  KYLIN-3839 Strorage clean up after refreshing and deleting 
segment
62d5b03 is described below

commit 62d5b0336b65013522e258499f640416f7e5ca82
Author: chao long <wayn...@qq.com>
AuthorDate: Mon Mar 18 13:34:01 2019 +0800

    KYLIN-3839 Strorage clean up after refreshing and deleting segment
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../src/main/resources/kylin-defaults.properties   |  3 +
 .../java/org/apache/kylin/cube/CubeInstance.java   | 12 ++-
 .../org/apache/kylin/rest/service/CubeService.java | 40 ++++++++-
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   | 98 +++++++++++++---------
 .../hbase/steps/HBaseSparkOutputTransition.java    |  2 +-
 .../kylin/storage/hbase/util/StorageCleanUtil.java | 93 ++++++++++++++++++++
 7 files changed, 208 insertions(+), 44 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 82f79eb..c240e7e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1209,6 +1209,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.storage.hbase.replication-scope", "0"));
     }
 
+    public boolean cleanStorageAfterDelOperation() {
+        return 
Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", 
FALSE));
+    }
+
     // 
============================================================================
     // ENGINE.MR
     // 
============================================================================
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index c9b0d59..e0cb1f0 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -149,6 +149,9 @@ kylin.storage.partition.max-scan-bytes=3221225472
 # You can set it to a smaller value. 0 means use default.
 # kylin.storage.hbase.coprocessor-timeout-seconds=0
 
+# clean real storage after delete operation
+# if you want to delete the real storage like htable of deleting segment, you 
can set it to true
+kylin.storage.clean-after-delete-operation=false
 
 ### JOB ###
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 4599cf6..ad99377 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -185,10 +185,18 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         return segments.getMergingSegments(mergedSegment);
     }
 
+    public CubeSegment getOriginalSegmentToRefresh(CubeSegment 
refreshedSegment) {
+        return getOriginalSegment(refreshedSegment);
+    }
+
     public CubeSegment getOriginalSegmentToOptimize(CubeSegment 
optimizedSegment) {
+        return getOriginalSegment(optimizedSegment);
+    }
+
+    private CubeSegment getOriginalSegment(CubeSegment toSegment) {
         for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) {
-            if (!optimizedSegment.equals(segment) //
-                    && 
optimizedSegment.getSegRange().equals(segment.getSegRange())) {
+            if (!toSegment.equals(segment) //
+                    && toSegment.getSegRange().equals(segment.getSegRange())) {
                 return segment;
             }
         }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index a9fbb97..2a5ce26 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,9 +29,11 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -43,6 +45,7 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.common.PatternedLogger;
@@ -81,6 +84,8 @@ import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.ValidateUtil;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.StorageCleanUtil;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -348,8 +353,12 @@ public class CubeService extends BasicService implements 
InitializingBean {
             }
         }
 
+        List<CubeSegment> toRemoveSegs = cube.getSegments();
+
         int cubeNum = 
getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size();
         getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete 
cube desc when no other cube is using it
+
+        cleanSegmentStorage(toRemoveSegs);
     }
 
     /**
@@ -380,7 +389,6 @@ public class CubeService extends BasicService implements 
InitializingBean {
 
         this.releaseAllSegments(cube);
         return cube;
-
     }
 
     /**
@@ -550,7 +558,30 @@ public class CubeService extends BasicService implements 
InitializingBean {
             logger.warn(String.format(Locale.ROOT, 
msg.getDELETE_SEGMENT_CAUSE_GAPS(), cube.getName(), segmentName));
         }
 
-        return 
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
+        CubeInstance cubeInstance = 
CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
+
+        cleanSegmentStorage(Collections.singletonList(toDelete));
+
+        return cubeInstance;
+    }
+
+    // clean segment data in hbase and hdfs
+    private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws 
IOException {
+        if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) 
{
+            return;
+        }
+
+        if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
+            List<String> toDropHTables = 
Lists.newArrayListWithCapacity(toRemoveSegs.size());
+            List<String> toDelHDFSPaths = 
Lists.newArrayListWithCapacity(toRemoveSegs.size());
+            for (CubeSegment seg : toRemoveSegs) {
+                toDropHTables.add(seg.getStorageLocationIdentifier());
+                
toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(),
 seg.getLastBuildJobID()));
+            }
+
+            StorageCleanUtil.dropHTables(new 
HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
+            StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), 
toDelHDFSPaths);
+        }
     }
 
     public boolean isOrphonSegment(CubeInstance cube, String segId) {
@@ -586,7 +617,12 @@ public class CubeService extends BasicService implements 
InitializingBean {
     private void releaseAllSegments(CubeInstance cube) throws IOException {
         releaseAllJobs(cube);
 
+        List<CubeSegment> toRemoveSegs = cube.getSegments();
+
+        // remove from metadata
         getCubeManager().clearSegments(cube);
+
+        cleanSegmentStorage(toRemoveSegs);
     }
 
     public void updateOnNewSegmentReady(String cubeName) {
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4d61d9b..205abe2 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.kylin.common.util.StringUtil;
@@ -115,20 +116,6 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
         return bulkLoadStep;
     }
 
-    public MergeGCStep createMergeGCStep() {
-        MergeGCStep result = new MergeGCStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
-        result.setOldHTables(getMergingHTables());
-        return result;
-    }
-
-    public MergeGCStep createOptimizeGCStep() {
-        MergeGCStep result = new MergeGCStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(getOptimizeHTables());
-        return result;
-    }
-
     public List<CubeSegment> getOptimizeSegments() {
         CubeInstance cube = (CubeInstance) seg.getRealization();
         List<CubeSegment> newSegments = 
Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
@@ -153,19 +140,25 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
 
     public List<String> getMergingHTables() {
         final List<CubeSegment> mergingSegments = ((CubeInstance) 
seg.getRealization())
-                .getMergingSegments((CubeSegment) seg);
+                .getMergingSegments(seg);
         Preconditions.checkState(mergingSegments.size() > 1,
                 "there should be more than 2 segments to merge, target segment 
" + seg);
-        final List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-        return mergingHTables;
+        return getOldHTables(mergingSegments);
+    }
+
+    public List<String> getRefreshingHTables() {
+        final CubeSegment refreshingSegment = ((CubeInstance) 
seg.getRealization()).getOriginalSegmentToRefresh(seg);
+        return getOldHTables(Collections.singletonList(refreshingSegment));
+    }
+
+    public List<String> getRefreshingHDFSPaths() {
+        final CubeSegment refreshingSegment = ((CubeInstance) 
seg.getRealization()).getOriginalSegmentToRefresh(seg);
+        return getOldHDFSPaths(Collections.singletonList(refreshingSegment));
     }
 
     public List<String> getMergingHDFSPaths() {
         final List<CubeSegment> mergingSegments = ((CubeInstance) 
seg.getRealization())
-                .getMergingSegments((CubeSegment) seg);
+                .getMergingSegments(seg);
         Preconditions.checkState(mergingSegments.size() > 1,
                 "there should be more than 2 segments to merge, target segment 
" + seg);
         final List<String> mergingHDFSPaths = Lists.newArrayList();
@@ -203,10 +196,7 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
         List<String> toDeletePaths = new ArrayList<>();
         toDeletePaths.add(getOptimizationRootPath(jobId));
 
-        HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
+        HDFSPathGarbageCollectionStep step 
=createHDFSPathGCStep(toDeletePaths, jobId);
 
         jobFlow.addTask(step);
     }
@@ -214,15 +204,13 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
     public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable 
jobFlow) {
         String jobId = jobFlow.getId();
 
-        jobFlow.addTask(createOptimizeGCStep());
+        MergeGCStep hBaseGCStep = createHBaseGCStep(getOptimizeHTables());
+        jobFlow.addTask(hBaseGCStep);
 
         List<String> toDeletePaths = new ArrayList<>();
         toDeletePaths.addAll(getOptimizeHDFSPaths());
 
-        HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
+        HDFSPathGarbageCollectionStep step = 
createHDFSPathGCStep(toDeletePaths, jobId);
 
         jobFlow.addTask(step);
     }
@@ -230,16 +218,14 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
     public void addMergingGarbageCollectionSteps(DefaultChainedExecutable 
jobFlow) {
         String jobId = jobFlow.getId();
 
-        jobFlow.addTask(createMergeGCStep());
+        MergeGCStep hBaseGCStep = createHBaseGCStep(getMergingHTables());
+        jobFlow.addTask(hBaseGCStep);
 
         List<String> toDeletePaths = new ArrayList<>();
         toDeletePaths.addAll(getMergingHDFSPaths());
         toDeletePaths.add(getHFilePath(jobId));
 
-        HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
+        HDFSPathGarbageCollectionStep step = 
createHDFSPathGCStep(toDeletePaths, jobId);
 
         jobFlow.addTask(step);
     }
@@ -252,12 +238,46 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
         toDeletePaths.add(getHFilePath(jobId));
         toDeletePaths.add(getShrunkenDictionaryPath(jobId));
 
-        HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
-        step.setDeletePaths(toDeletePaths);
-        step.setJobId(jobId);
+        CubeSegment oldSegment = 
((CubeInstance)seg.getRealization()).getOriginalSegmentToRefresh(seg);
+
+        // refresh segment
+        if (oldSegment != null) {
+            // delete old hdfs job
+            toDeletePaths.addAll(getRefreshingHDFSPaths());
+
+            // drop old htables
+            MergeGCStep hBaseGCStep = 
createHBaseGCStep(getRefreshingHTables());
+            jobFlow.addTask(hBaseGCStep);
+        }
 
+        HDFSPathGarbageCollectionStep step = 
createHDFSPathGCStep(toDeletePaths, jobId);
         jobFlow.addTask(step);
     }
 
+    /**
+     * create 'HBase Garbage clean step' to drop HTables in HBase
+     * @param toDropHTables
+     * @return
+     */
+    public MergeGCStep createHBaseGCStep(List<String> toDropHTables) {
+        MergeGCStep hBaseGCStep = new MergeGCStep();
+        
hBaseGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
+        hBaseGCStep.setOldHTables(toDropHTables);
+        return hBaseGCStep;
+    }
+
+    /**
+     * create 'HDFS Garbage clean step' to delete paths on HDFS
+     * @param toDeletePaths
+     * @param jobId
+     * @return
+     */
+    public HDFSPathGarbageCollectionStep createHDFSPathGCStep(List<String> 
toDeletePaths, String jobId) {
+        HDFSPathGarbageCollectionStep hdfsGCStep = new 
HDFSPathGarbageCollectionStep();
+        
hdfsGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+        hdfsGCStep.setDeletePaths(toDeletePaths);
+        hdfsGCStep.setJobId(jobId);
+        return hdfsGCStep;
+    }
+
 }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index e6c3ee8..43babfd 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -60,7 +60,7 @@ public class HBaseSparkOutputTransition implements 
ISparkOutput {
 
             @Override
             public void addStepPhase4_Cleanup(DefaultChainedExecutable 
jobFlow) {
-                // nothing to do
+                steps.addCubingGarbageCollectionSteps(jobFlow);
             }
 
         };
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
new file mode 100644
index 0000000..a1259b8
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class StorageCleanUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(StorageCleanUtil.class);
+
+    /**
+     * this method will close hbaseAdmin after finishing the work.
+     */
+    public static void dropHTables(final HBaseAdmin hbaseAdmin, List<String> 
hTables) {
+        runSingleThreadTaskQuietly(() -> {
+            try {
+                for (String htable : hTables) {
+                    logger.info("Deleting HBase table {}", htable);
+
+                    if (hbaseAdmin.tableExists(htable)) {
+                        if (hbaseAdmin.isTableEnabled(htable)) {
+                            hbaseAdmin.disableTable(htable);
+                        }
+
+                        hbaseAdmin.deleteTable(htable);
+                        logger.info("Deleted HBase table {}", htable);
+                    } else {
+                        logger.info("HBase table {} does not exist.", htable);
+                    }
+                }
+            } catch (Exception e) {
+                // storage cleanup job will delete it
+                logger.error("Deleting HBase table failed");
+            } finally {
+                IOUtils.closeQuietly(hbaseAdmin);
+            }
+        });
+    }
+
+    public static void deleteHDFSPath(final FileSystem fileSystem, 
List<String> hdfsPaths) {
+        runSingleThreadTaskQuietly(() -> {
+            try {
+                for (String hdfsPath : hdfsPaths) {
+                    logger.info("Deleting HDFS path {}", hdfsPath);
+                    Path path = new Path(hdfsPath);
+
+                    if (fileSystem.exists(path)) {
+                        fileSystem.delete(path, true);
+                        logger.info("Deleted HDFS path {}", hdfsPath);
+                    }
+                }
+            } catch (Exception e) {
+                // storage cleanup job will delete it
+                logger.error("Deleting HDFS path failed");
+            }
+        });
+    }
+
+    private static void runSingleThreadTaskQuietly(Runnable task) {
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        try {
+            executorService.execute(task);
+        } catch (Exception e) {
+            logger.error("Failed to run task", e);
+        } finally {
+            executorService.shutdown();
+        }
+    }
+}

Reply via email to