This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 62d5b03 KYLIN-3839 Strorage clean up after refreshing and deleting segment 62d5b03 is described below commit 62d5b0336b65013522e258499f640416f7e5ca82 Author: chao long <wayn...@qq.com> AuthorDate: Mon Mar 18 13:34:01 2019 +0800 KYLIN-3839 Strorage clean up after refreshing and deleting segment --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../src/main/resources/kylin-defaults.properties | 3 + .../java/org/apache/kylin/cube/CubeInstance.java | 12 ++- .../org/apache/kylin/rest/service/CubeService.java | 40 ++++++++- .../kylin/storage/hbase/steps/HBaseJobSteps.java | 98 +++++++++++++--------- .../hbase/steps/HBaseSparkOutputTransition.java | 2 +- .../kylin/storage/hbase/util/StorageCleanUtil.java | 93 ++++++++++++++++++++ 7 files changed, 208 insertions(+), 44 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 82f79eb..c240e7e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1209,6 +1209,10 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.storage.hbase.replication-scope", "0")); } + public boolean cleanStorageAfterDelOperation() { + return Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", FALSE)); + } + // ============================================================================ // ENGINE.MR // ============================================================================ diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index c9b0d59..e0cb1f0 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -149,6 +149,9 @@ kylin.storage.partition.max-scan-bytes=3221225472 # You can set it to a smaller value. 0 means use default. # kylin.storage.hbase.coprocessor-timeout-seconds=0 +# clean real storage after delete operation +# if you want to delete the real storage like htable of deleting segment, you can set it to true +kylin.storage.clean-after-delete-operation=false ### JOB ### diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 4599cf6..ad99377 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -185,10 +185,18 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return segments.getMergingSegments(mergedSegment); } + public CubeSegment getOriginalSegmentToRefresh(CubeSegment refreshedSegment) { + return getOriginalSegment(refreshedSegment); + } + public CubeSegment getOriginalSegmentToOptimize(CubeSegment optimizedSegment) { + return getOriginalSegment(optimizedSegment); + } + + private CubeSegment getOriginalSegment(CubeSegment toSegment) { for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) { - if (!optimizedSegment.equals(segment) // - && optimizedSegment.getSegRange().equals(segment.getSegRange())) { + if (!toSegment.equals(segment) // + && toSegment.getSegRange().equals(segment.getSegRange())) { return segment; } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index a9fbb97..2a5ce26 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,9 +29,11 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -43,6 +45,7 @@ import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.common.PatternedLogger; @@ -81,6 +84,8 @@ import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.response.MetricsResponse; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.ValidateUtil; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.util.StorageCleanUtil; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,8 +353,12 @@ public class CubeService extends BasicService implements InitializingBean { } } + List<CubeSegment> toRemoveSegs = cube.getSegments(); + int cubeNum = getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size(); getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete cube desc when no other cube is using it + + cleanSegmentStorage(toRemoveSegs); } /** @@ -380,7 +389,6 @@ public class CubeService extends BasicService implements InitializingBean { this.releaseAllSegments(cube); return cube; - } /** @@ -550,7 +558,30 @@ public class CubeService extends BasicService implements InitializingBean { logger.warn(String.format(Locale.ROOT, msg.getDELETE_SEGMENT_CAUSE_GAPS(), cube.getName(), segmentName)); } - return CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete); + CubeInstance cubeInstance = CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete); + + cleanSegmentStorage(Collections.singletonList(toDelete)); + + return cubeInstance; + } + + // clean segment data in hbase and hdfs + private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws IOException { + if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) { + return; + } + + if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) { + List<String> toDropHTables = Lists.newArrayListWithCapacity(toRemoveSegs.size()); + List<String> toDelHDFSPaths = Lists.newArrayListWithCapacity(toRemoveSegs.size()); + for (CubeSegment seg : toRemoveSegs) { + toDropHTables.add(seg.getStorageLocationIdentifier()); + toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(), seg.getLastBuildJobID())); + } + + StorageCleanUtil.dropHTables(new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables); + StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), toDelHDFSPaths); + } } public boolean isOrphonSegment(CubeInstance cube, String segId) { @@ -586,7 +617,12 @@ public class CubeService extends BasicService implements InitializingBean { private void releaseAllSegments(CubeInstance cube) throws IOException { releaseAllJobs(cube); + List<CubeSegment> toRemoveSegs = cube.getSegments(); + + // remove from metadata getCubeManager().clearSegments(cube); + + cleanSegmentStorage(toRemoveSegs); } public void updateOnNewSegmentReady(String cubeName) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java index 4d61d9b..205abe2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.hbase.steps; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.kylin.common.util.StringUtil; @@ -115,20 +116,6 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { return bulkLoadStep; } - public MergeGCStep createMergeGCStep() { - MergeGCStep result = new MergeGCStep(); - result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE); - result.setOldHTables(getMergingHTables()); - return result; - } - - public MergeGCStep createOptimizeGCStep() { - MergeGCStep result = new MergeGCStep(); - result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); - result.setOldHTables(getOptimizeHTables()); - return result; - } - public List<CubeSegment> getOptimizeSegments() { CubeInstance cube = (CubeInstance) seg.getRealization(); List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING)); @@ -153,19 +140,25 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { public List<String> getMergingHTables() { final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) - .getMergingSegments((CubeSegment) seg); + .getMergingSegments(seg); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); - final List<String> mergingHTables = Lists.newArrayList(); - for (CubeSegment merging : mergingSegments) { - mergingHTables.add(merging.getStorageLocationIdentifier()); - } - return mergingHTables; + return getOldHTables(mergingSegments); + } + + public List<String> getRefreshingHTables() { + final CubeSegment refreshingSegment = ((CubeInstance) seg.getRealization()).getOriginalSegmentToRefresh(seg); + return getOldHTables(Collections.singletonList(refreshingSegment)); + } + + public List<String> getRefreshingHDFSPaths() { + final CubeSegment refreshingSegment = ((CubeInstance) seg.getRealization()).getOriginalSegmentToRefresh(seg); + return getOldHDFSPaths(Collections.singletonList(refreshingSegment)); } public List<String> getMergingHDFSPaths() { final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) - .getMergingSegments((CubeSegment) seg); + .getMergingSegments(seg); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); final List<String> mergingHDFSPaths = Lists.newArrayList(); @@ -203,10 +196,7 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { List<String> toDeletePaths = new ArrayList<>(); toDeletePaths.add(getOptimizationRootPath(jobId)); - HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); - step.setDeletePaths(toDeletePaths); - step.setJobId(jobId); + HDFSPathGarbageCollectionStep step =createHDFSPathGCStep(toDeletePaths, jobId); jobFlow.addTask(step); } @@ -214,15 +204,13 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { String jobId = jobFlow.getId(); - jobFlow.addTask(createOptimizeGCStep()); + MergeGCStep hBaseGCStep = createHBaseGCStep(getOptimizeHTables()); + jobFlow.addTask(hBaseGCStep); List<String> toDeletePaths = new ArrayList<>(); toDeletePaths.addAll(getOptimizeHDFSPaths()); - HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); - step.setDeletePaths(toDeletePaths); - step.setJobId(jobId); + HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId); jobFlow.addTask(step); } @@ -230,16 +218,14 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { String jobId = jobFlow.getId(); - jobFlow.addTask(createMergeGCStep()); + MergeGCStep hBaseGCStep = createHBaseGCStep(getMergingHTables()); + jobFlow.addTask(hBaseGCStep); List<String> toDeletePaths = new ArrayList<>(); toDeletePaths.addAll(getMergingHDFSPaths()); toDeletePaths.add(getHFilePath(jobId)); - HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); - step.setDeletePaths(toDeletePaths); - step.setJobId(jobId); + HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId); jobFlow.addTask(step); } @@ -252,12 +238,46 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { toDeletePaths.add(getHFilePath(jobId)); toDeletePaths.add(getShrunkenDictionaryPath(jobId)); - HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE); - step.setDeletePaths(toDeletePaths); - step.setJobId(jobId); + CubeSegment oldSegment = ((CubeInstance)seg.getRealization()).getOriginalSegmentToRefresh(seg); + + // refresh segment + if (oldSegment != null) { + // delete old hdfs job + toDeletePaths.addAll(getRefreshingHDFSPaths()); + + // drop old htables + MergeGCStep hBaseGCStep = createHBaseGCStep(getRefreshingHTables()); + jobFlow.addTask(hBaseGCStep); + } + HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId); jobFlow.addTask(step); } + /** + * create 'HBase Garbage clean step' to drop HTables in HBase + * @param toDropHTables + * @return + */ + public MergeGCStep createHBaseGCStep(List<String> toDropHTables) { + MergeGCStep hBaseGCStep = new MergeGCStep(); + hBaseGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE); + hBaseGCStep.setOldHTables(toDropHTables); + return hBaseGCStep; + } + + /** + * create 'HDFS Garbage clean step' to delete paths on HDFS + * @param toDeletePaths + * @param jobId + * @return + */ + public HDFSPathGarbageCollectionStep createHDFSPathGCStep(List<String> toDeletePaths, String jobId) { + HDFSPathGarbageCollectionStep hdfsGCStep = new HDFSPathGarbageCollectionStep(); + hdfsGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); + hdfsGCStep.setDeletePaths(toDeletePaths); + hdfsGCStep.setJobId(jobId); + return hdfsGCStep; + } + } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java index e6c3ee8..43babfd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java @@ -60,7 +60,7 @@ public class HBaseSparkOutputTransition implements ISparkOutput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - // nothing to do + steps.addCubingGarbageCollectionSteps(jobFlow); } }; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java new file mode 100644 index 0000000..a1259b8 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.util; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class StorageCleanUtil { + private static final Logger logger = LoggerFactory.getLogger(StorageCleanUtil.class); + + /** + * this method will close hbaseAdmin after finishing the work. + */ + public static void dropHTables(final HBaseAdmin hbaseAdmin, List<String> hTables) { + runSingleThreadTaskQuietly(() -> { + try { + for (String htable : hTables) { + logger.info("Deleting HBase table {}", htable); + + if (hbaseAdmin.tableExists(htable)) { + if (hbaseAdmin.isTableEnabled(htable)) { + hbaseAdmin.disableTable(htable); + } + + hbaseAdmin.deleteTable(htable); + logger.info("Deleted HBase table {}", htable); + } else { + logger.info("HBase table {} does not exist.", htable); + } + } + } catch (Exception e) { + // storage cleanup job will delete it + logger.error("Deleting HBase table failed"); + } finally { + IOUtils.closeQuietly(hbaseAdmin); + } + }); + } + + public static void deleteHDFSPath(final FileSystem fileSystem, List<String> hdfsPaths) { + runSingleThreadTaskQuietly(() -> { + try { + for (String hdfsPath : hdfsPaths) { + logger.info("Deleting HDFS path {}", hdfsPath); + Path path = new Path(hdfsPath); + + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + logger.info("Deleted HDFS path {}", hdfsPath); + } + } + } catch (Exception e) { + // storage cleanup job will delete it + logger.error("Deleting HDFS path failed"); + } + }); + } + + private static void runSingleThreadTaskQuietly(Runnable task) { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + executorService.execute(task); + } catch (Exception e) { + logger.error("Failed to run task", e); + } finally { + executorService.shutdown(); + } + } +}