This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new a20f04f KYLIN-3430 Global Dictionary Cleanup a20f04f is described below commit a20f04fb17ccb58162471868e5adf3594fb41c77 Author: Temple Zhou <dba...@gmail.com> AuthorDate: Thu Dec 27 15:04:58 2018 +0800 KYLIN-3430 Global Dictionary Cleanup --- .../apache/kylin/rest/job/MetadataCleanupJob.java | 49 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java index 5ee5c7a..e11fe74 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.job; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -25,11 +26,18 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryInfoSerializer; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.execution.ExecutableState; @@ -68,6 +76,7 @@ public class MetadataCleanupJob { CubeManager cubeManager = CubeManager.getInstance(config); ResourceStore store = ResourceStore.getStore(config); long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS; + FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); List<String> toDeleteCandidates = Lists.newArrayList(); @@ -82,6 +91,23 @@ public class MetadataCleanupJob { } } + // find all of the global dictionaries in HDFS + try { + FileStatus[] fStatus = new FileStatus[0]; + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict"))); + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict"))); + for (FileStatus status : fStatus) { + String path = status.getPath().toString(); + FileStatus[] globalDicts = fs.listStatus(new Path(path)); + for (FileStatus globalDict : globalDicts) { + String globalDictPath = globalDict.getPath().toString(); + toDeleteCandidates.add(globalDictPath); + } + } + } catch (FileNotFoundException e) { + logger.info("Working Directory does not exist on HDFS. "); + } + // three level resources, only dictionaries for (String resourceRoot : new String[] { ResourceStore.DICT_RESOURCE_ROOT }) { for (String dir : noNull(store.listResources(resourceRoot))) { @@ -102,6 +128,20 @@ public class MetadataCleanupJob { activeResources.addAll(segment.getSnapshotPaths()); activeResources.addAll(segment.getDictionaryPaths()); activeResources.add(segment.getStatisticsResourcePath()); + for (String dictPath : segment.getDictionaryPaths()) { + DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER); + if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){ + String dictObj = dictInfo.getDictionaryObject().toString(); + String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1); + if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) { + activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + + "resources/GlobalDict" + dictInfo.getResourceDir()); + } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) { + activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + + "resources/SegmentDict" + dictInfo.getResourceDir()); + } + } + } } } toDeleteCandidates.removeAll(activeResources); @@ -129,7 +169,7 @@ public class MetadataCleanupJob { return garbageResources; } - private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) { + private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) throws IOException { if (toDeleteResources.isEmpty()) { logger.info("No metadata resource to clean up"); return toDeleteResources; @@ -139,10 +179,15 @@ public class MetadataCleanupJob { if (delete) { ResourceStore store = ResourceStore.getStore(config); + FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); for (String res : toDeleteResources) { logger.info("Deleting metadata " + res); try { - store.deleteResource(res); + if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) { + fs.delete(new Path(res), true); + } else { + store.deleteResource(res); + } } catch (IOException e) { logger.error("Failed to delete resource " + res, e); }