This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new a7b7edb  Add option to remove stale files under job_tmp for 
StorageCleanupJob (#1732)
a7b7edb is described below

commit a7b7edba90e8810bad3b9a76cbf64241da8de4b5
Author: Xiaoxiang Yu <x...@apache.org>
AuthorDate: Wed Aug 18 12:52:13 2021 +0800

    Add option to remove stale files under job_tmp for StorageCleanupJob (#1732)
    
    * Add option to remove stale files under job_tmp for StorageCleanupJob
    
    * fix code style
    
    * fix cleanupJobTmp
    
    * fix sample.sh
    
    * fix kylin.engine.build-base-cuboid-enabled conflict with cube planner
    
    * minor fix
    
    * Fix optimize job
    
    * fix typo
    
    Co-authored-by: yaqian.zhang <598593...@qq.com>
---
 .../engine/mr/common/CuboidRecommenderUtil.java    |  13 +-
 build/conf/spark-driver-log4j.properties           |   6 +
 .../kylin/common/util/CliCommandExecutor.java      |   2 +-
 .../src/main/resources/kylin-defaults.properties   |   1 +
 kylin-it/DEPRECATED_MODULE                         |   4 +-
 .../engine/spark/utils/UpdateMetadataUtil.java     |  42 +++----
 .../apache/kylin/rest/job/StorageCleanupJob.java   | 131 +++++++++++++++------
 .../kylin/rest/job/StorageCleanupJobTest.java      |   2 +-
 8 files changed, 134 insertions(+), 67 deletions(-)

diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index f6ae332..71d2fd1 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -51,8 +51,9 @@ public class CuboidRecommenderUtil {
         }
         CubeInstance cube = segment.getCubeInstance();
         long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
-        if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
-                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L) {
+        if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L)
+                && segment.getConfig().isBuildBaseCuboid()) {
             logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
             return null;
         }
@@ -77,7 +78,8 @@ public class CuboidRecommenderUtil {
         Pair<Map<Long, Long>, Map<Long, Double>> statsPair = 
CuboidStatsReaderUtil
                 .readCuboidStatsAndSizeFromCube(currentCuboids, cube);
         long baseCuboid = cuboidScheduler.getBaseCuboidId();
-        if (statsPair.getFirst().get(baseCuboid) == null || 
statsPair.getFirst().get(baseCuboid) == 0L) {
+        if ((statsPair.getFirst().get(baseCuboid) == null || 
statsPair.getFirst().get(baseCuboid) == 0L)
+                && cube.getConfig().isBuildBaseCuboid()) {
             logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
             return null;
         }
@@ -121,8 +123,9 @@ public class CuboidRecommenderUtil {
         }
         CubeInstance cube = segment.getCubeInstance();
         long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
-        if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
-                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L) {
+        if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+                || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) 
== 0L)
+                && segment.getConfig().isBuildBaseCuboid()) {
             logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
             return null;
         }
diff --git a/build/conf/spark-driver-log4j.properties 
b/build/conf/spark-driver-log4j.properties
index 04c648c..364a6ed 100644
--- a/build/conf/spark-driver-log4j.properties
+++ b/build/conf/spark-driver-log4j.properties
@@ -22,6 +22,11 @@ log4j.logger.org.apache.kylin=DEBUG
 log4j.logger.org.springframework=WARN
 log4j.logger.org.apache.spark=WARN
 
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : 
%m%n
+
 # hdfs file appender
 
log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
 log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled}
@@ -36,6 +41,7 @@ 
log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayou
 #Don't add line number (%L) as it's too costly!
 log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
 
+## Log saved under $KYLIN_HOME/logs/spark
 log4j.appender.logFile=org.apache.log4j.FileAppender
 log4j.appender.logFile.Threshold=DEBUG
 
log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 23089b0..54bab60 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -152,7 +152,7 @@ public class CliCommandExecutor {
             }
 
             if (Thread.interrupted()) {
-                logger.info("CliCommandExecutor is interruppted by other, kill 
the sub process: " + command);
+                logger.info("CliCommandExecutor is interrupted by other, kill 
the sub process: " + command);
                 proc.destroy();
                 try {
                     Thread.sleep(1000);
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index 384466f..badfa69 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -83,6 +83,7 @@ kylin.web.link-hadoop=
 kylin.web.link-diagnostic=
 kylin.web.contact-mail=
 kylin.server.external-acl-provider=
+kylin.source.hive.database-for-flat-table=default
 
 # Default time filter for job list, 0->current day, 1->last one day, 2->last 
one week, 3->last one year, 4->all
 kylin.web.default-time-filter=1
diff --git a/kylin-it/DEPRECATED_MODULE b/kylin-it/DEPRECATED_MODULE
index b593cf2..e3c19f4 100644
--- a/kylin-it/DEPRECATED_MODULE
+++ b/kylin-it/DEPRECATED_MODULE
@@ -16,4 +16,6 @@
 # limitations under the License.
 #
 
-This maven module is deprecated and maybe be removed in the future.
\ No newline at end of file
+This maven module is deprecated and maybe be removed in the future.
+
+Remind that the path 'kylin-it/src/test/resources/query' is still be used by 
module 'kylin-spark-project/kylin-spark-test' .
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index a02530e..6921444 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -113,30 +113,30 @@ public class UpdateMetadataUtil {
             CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, 
config);
             Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
             if (origSegStatsReader.getCuboidRowHLLCounters() == null) {
-                throw new IllegalArgumentException(
+                logger.warn(
                         "Cuboid statistics of original segment do not exist. 
Please check the config of kylin.engine.segment-statistics-enabled.");
-            }
-            addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap);
-            addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap);
-
-            Set<Long> recommendCuboids = 
currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND);
-            Map<Long, HLLCounter> resultCuboidHLLMap = 
Maps.newHashMapWithExpectedSize(recommendCuboids.size());
-            for (long cuboid : recommendCuboids) {
-                HLLCounter hll = cuboidHLLMap.get(cuboid);
-                if (hll == null) {
-                    logger.warn("Cannot get the row count stats for cuboid " + 
cuboid);
-                } else {
-                    resultCuboidHLLMap.put(cuboid, hll);
+            } else {
+                addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap);
+                addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap);
+
+                Set<Long> recommendCuboids = 
currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND);
+                Map<Long, HLLCounter> resultCuboidHLLMap = 
Maps.newHashMapWithExpectedSize(recommendCuboids.size());
+                for (long cuboid : recommendCuboids) {
+                    HLLCounter hll = cuboidHLLMap.get(cuboid);
+                    if (hll == null) {
+                        logger.warn("Cannot get the row count stats for cuboid 
" + cuboid);
+                    } else {
+                        resultCuboidHLLMap.put(cuboid, hll);
+                    }
                 }
+                if (fs.exists(statisticsFile)) {
+                    fs.delete(statisticsFile, false);
+                }
+                
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new 
Path(statisticsDir),
+                        resultCuboidHLLMap, 1, 
origSegStatsReader.getSourceRowCount());
+                FSDataInputStream is = fs.open(statisticsFile);
+                ResourceStore.getStore(config).putBigResource(resKey, is, 
System.currentTimeMillis());
             }
-            if (fs.exists(statisticsFile)) {
-                fs.delete(statisticsFile, false);
-            }
-            
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new 
Path(statisticsDir),
-                    resultCuboidHLLMap, 1, 
origSegStatsReader.getSourceRowCount());
-            FSDataInputStream is = fs.open(statisticsFile);
-            ResourceStore.getStore(config).putBigResource(resKey, is, 
System.currentTimeMillis());
-
             toUpdateSeg.setStatus(SegmentStatusEnum.READY_PENDING);
         } else {
             toUpdateSeg.setStatus(SegmentStatusEnum.READY);
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index f4ad269..95a7a48 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -43,43 +43,75 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * Please update 
https://cwiki.apache.org/confluence/display/KYLIN/How+to+clean+up+storage+in+Kylin+4
+ *   if you change this class.
+ */
 public class StorageCleanupJob extends AbstractApplication {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StorageCleanupJob.class);
 
+    /**
+     * It is considered quite safe to remove job_tmp path which was created 1 
week ago .
+     */
+    public static final int DEFAULT_CLEANUP_HOUR_THRESHOLD = 24 * 7;
+    public static final boolean DEFAULT_CLEANUP_DICT = true;
+    public static final boolean DEFAULT_CLEANUP_SNAPSHOT = true;
+    public static final boolean DEFAULT_CLEANUP_JOB_TMP = false;
+    public static final boolean DEFAULT_CLEANUP = false;
+    private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
+    private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/";
+
     @SuppressWarnings("static-access")
-    protected static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete").hasArg().isRequired(false)
-            .withDescription("Delete the unused storage").create("delete");
+    protected static final Option OPTION_DELETE = 
OptionBuilder.withArgName("delete")
+            .hasArg().isRequired(false)
+            .withType(Boolean.class.getName())
+            .withDescription("Boolean, whether or not to do real delete 
operation. Default value is " + DEFAULT_CLEANUP + ", means a dry run.")
+            .create("delete");
+
     @SuppressWarnings("static-access")
     protected static final Option OPTION_CLEANUP_TABLE_SNAPSHOT = 
OptionBuilder.withArgName("cleanupTableSnapshot")
-            .hasArg().isRequired(false).withDescription("Delete the unused 
storage").create("cleanupTableSnapshot");
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_CLEANUP_GLOBAL_DICT = 
OptionBuilder.withArgName("cleanupGlobalDict").hasArg()
-            .isRequired(false).withDescription("Delete the unused 
storage").create("cleanupGlobalDict");
+            .hasArg().isRequired(false)
+            .withType(Boolean.class.getName())
+            .withDescription("Boolean, whether or not to delete unreferenced 
snapshot files. Default value is " + DEFAULT_CLEANUP_SNAPSHOT + " .")
+            .create("cleanupTableSnapshot");
+
     @SuppressWarnings("static-access")
-    protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR = 
OptionBuilder.withArgName("cleanupThreshold").hasArg()
-            .isRequired(false).withDescription("Delete unused storage that 
have not been modified in how many hours")
-            .create("cleanupThreshold");
+    protected static final Option OPTION_CLEANUP_GLOBAL_DICT = 
OptionBuilder.withArgName("cleanupGlobalDict")
+            .hasArg().isRequired(false)
+            .withType(Boolean.class.getName())
+            .withDescription("Boolean, whether or not to delete unreferenced 
global dict files. Default value is " + DEFAULT_CLEANUP_DICT + " .")
+            .create("cleanupGlobalDict");
 
-    private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
-    private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/";
+    @SuppressWarnings("static-access")
+    protected static final Option OPTION_CLEANUP_JOB_TMP = 
OptionBuilder.withArgName("cleanupJobTmp")
+            .hasArg().isRequired(false)
+            .withType(Boolean.class.getName())
+            .withDescription("Boolean, whether or not to delete job tmp files. 
Default value is " + DEFAULT_CLEANUP_JOB_TMP + " .")
+            .create("cleanupJobTmp");
 
-    private static final String TABLE_SNAPSHOT = "table snapshot";
-    private static final String GLOBAL_DICTIONARY = "global dictionary";
-    private static final String SEGMENT_PARQUET_FILE = "segment parquet file";
+    @SuppressWarnings("static-access")
+    protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR = 
OptionBuilder.withArgName("cleanupThreshold")
+            .hasArg().isRequired(false)
+            .withType(Integer.class.getName())
+            .withDescription(
+                    "Integer, used to specific delete unreferenced storage 
that have not been modified before how many hours (recent files are protected). 
" +
+                    "Default value is " + DEFAULT_CLEANUP_HOUR_THRESHOLD + " 
hours.")
+            .create("cleanupThreshold");
 
     final protected KylinConfig config;
     final protected FileSystem fs;
     final protected ExecutableManager executableManager;
 
-    protected boolean delete = false;
-    protected boolean cleanupTableSnapshot = true;
-    protected boolean cleanupGlobalDict = true;
-    protected int cleanupThreshold = 12; // 12 hour
-
+    protected boolean delete = DEFAULT_CLEANUP;
+    protected boolean cleanupTableSnapshot = DEFAULT_CLEANUP_SNAPSHOT;
+    protected boolean cleanupGlobalDict = DEFAULT_CLEANUP_DICT;
+    protected boolean cleanupJobTmp = DEFAULT_CLEANUP;
+    protected int cleanupThreshold = DEFAULT_CLEANUP_HOUR_THRESHOLD;
     protected long storageTimeCut;
 
     protected static final List<String> protectedDir = 
Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs");
@@ -101,6 +133,7 @@ public class StorageCleanupJob extends AbstractApplication {
         options.addOption(OPTION_DELETE);
         options.addOption(OPTION_CLEANUP_GLOBAL_DICT);
         options.addOption(OPTION_CLEANUP_TABLE_SNAPSHOT);
+        options.addOption(OPTION_CLEANUP_JOB_TMP);
         options.addOption(OPTION_CLEANUP_THRESHOLD_HOUR);
         return options;
     }
@@ -108,13 +141,6 @@ public class StorageCleanupJob extends AbstractApplication 
{
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
-        logger.info("delete option value: '" + 
optionsHelper.getOptionValue(OPTION_DELETE) + "'");
-        logger.info("cleanup table snapshot option value: '"
-                + optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT) 
+ "'");
-        logger.info(
-                "delete global dict option value: '" + 
optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT) + "'");
-        logger.info("delete unused storage that have not been modified in how 
many hours option value: '"
-                + optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR) 
+ "'");
         delete = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
         if (optionsHelper.hasOption(OPTION_CLEANUP_TABLE_SNAPSHOT)) {
             cleanupTableSnapshot = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT));
@@ -122,11 +148,18 @@ public class StorageCleanupJob extends 
AbstractApplication {
         if (optionsHelper.hasOption(OPTION_CLEANUP_GLOBAL_DICT)) {
             cleanupGlobalDict = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT));
         }
+        if (optionsHelper.hasOption(OPTION_CLEANUP_JOB_TMP)) {
+            cleanupJobTmp = 
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_JOB_TMP));
+        }
         if (optionsHelper.hasOption(OPTION_CLEANUP_THRESHOLD_HOUR)) {
             cleanupThreshold = 
Integer.parseInt(optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR));
         }
 
         storageTimeCut = System.currentTimeMillis() - cleanupThreshold * 3600 
* 1000L;
+        Date cleanBeforeDate = new Date(storageTimeCut);
+        
logger.info("===================================================================\n"
 +
+                        "delete : {}; cleanupTableSnapshot : {}; 
cleanupGlobalDict : {}; cleanupJobTmp : {}; cleanBeforeDate : {}."
+                , delete, cleanupTableSnapshot, cleanupGlobalDict, 
cleanupJobTmp, cleanBeforeDate);
         cleanup();
     }
 
@@ -138,7 +171,7 @@ public class StorageCleanupJob extends AbstractApplication {
         List<String> projects = 
projectManager.listAllProjects().stream().map(ProjectInstance::getName)
                 .collect(Collectors.toList());
 
-        //clean up deleted projects and cubes
+        logger.info("Start to clean up unreferenced projects and cubes ...");
         List<CubeInstance> cubes = cubeManager.listAllCubes();
         Path metadataPath = new Path(config.getHdfsWorkingDirectory());
         if (fs.exists(metadataPath)) {
@@ -148,7 +181,7 @@ public class StorageCleanupJob extends AbstractApplication {
                     if (eligibleStorage(status)) {
                         String projectName = status.getPath().getName();
                         if (!projects.contains(projectName)) {
-                            cleanupStorage(status.getPath(), 
SEGMENT_PARQUET_FILE);
+                            deleteOp(status.getPath(), 
StorageCleanType.PROJECT_DIR);
                         } else {
                             cleanupGlobalDict(projectName,
                                     cubes.stream().filter(cube -> 
projectName.equals(cube.getProject()))
@@ -164,14 +197,14 @@ public class StorageCleanupJob extends 
AbstractApplication {
             }
         }
 
-        //clean up no used segments
+        logger.info("Start to clean up no unreferenced segments ...");
         for (CubeInstance cube : cubes) {
             List<String> segments = cube.getSegments().stream().map(segment -> 
{
                 return segment.getName() + "_" + 
segment.getStorageLocationIdentifier();
             }).collect(Collectors.toList());
             String project = cube.getProject();
 
-            //list all segment directory
+            // list all segment directory
             Path cubePath = new Path(config.getHdfsWorkingDirectory(project) + 
"/parquet/" + cube.getName());
             if (fs.exists(cubePath)) {
                 FileStatus[] segmentStatus = fs.listStatus(cubePath);
@@ -180,13 +213,26 @@ public class StorageCleanupJob extends 
AbstractApplication {
                         if (eligibleStorage(status)) {
                             String segment = status.getPath().getName();
                             if (!segments.contains(segment)) {
-                                cleanupStorage(status.getPath(), 
SEGMENT_PARQUET_FILE);
+                                deleteOp(status.getPath(), 
StorageCleanType.SEGMENT_DIR);
                             }
                         }
                     }
                 }
             } else {
-                logger.warn("Cube path doesn't exist! The path is " + 
cubePath);
+                logger.warn("Cube path doesn't exist! The path is {}", 
cubePath);
+            }
+        }
+
+        if (cleanupJobTmp) {
+            logger.info("Start to clean up stale job_tmp ...");
+            for (String prj : projects) {
+                Path prjPath = new Path(config.getJobTmpDir(prj));
+                FileStatus[] jobTmpPaths = fs.listStatus(prjPath);
+                for (FileStatus status : jobTmpPaths) {
+                    if (eligibleStorage(status)) {
+                        deleteOp(status.getPath(), StorageCleanType.JOB_TMP);
+                    }
+                }
             }
         }
     }
@@ -201,7 +247,7 @@ public class StorageCleanupJob extends AbstractApplication {
                     if (eligibleStorage(status)) {
                         String cubeName = status.getPath().getName();
                         if (!cubes.contains(cubeName)) {
-                            cleanupStorage(status.getPath(), 
SEGMENT_PARQUET_FILE);
+                            deleteOp(status.getPath(), 
StorageCleanType.CUBE_DIR);
                         }
                     }
                 }
@@ -237,7 +283,7 @@ public class StorageCleanupJob extends AbstractApplication {
         }
 
         for (Path path : toDeleteSnapshot) {
-            cleanupStorage(path, TABLE_SNAPSHOT);
+            deleteOp(path, StorageCleanType.TABLE_SNAPSHOT);
         }
     }
 
@@ -279,16 +325,16 @@ public class StorageCleanupJob extends 
AbstractApplication {
         }
 
         for (Path path : toDeleteDict) {
-            cleanupStorage(path, GLOBAL_DICTIONARY);
+            deleteOp(path, StorageCleanType.GLOBAL_DICTIONARY);
         }
     }
 
-    private void cleanupStorage(Path path, String storageType) throws 
IOException {
+    private void deleteOp(Path path, StorageCleanType type) throws IOException 
{
         if (delete) {
-            logger.info("Deleting unused {}, {}", storageType, path);
+            logger.info("Deleting unreferenced {}, {}", type, path);
             fs.delete(path, true);
         } else {
-            logger.info("Dry run, pending delete unused {}, {}", storageType, 
path);
+            logger.info("Dry run, pending delete unreferenced path {}, {}", 
type, path);
         }
     }
 
@@ -296,3 +342,12 @@ public class StorageCleanupJob extends AbstractApplication 
{
         return status != null && status.getModificationTime() < storageTimeCut;
     }
 }
+
+enum StorageCleanType {
+    PROJECT_DIR,
+    GLOBAL_DICTIONARY,
+    TABLE_SNAPSHOT,
+    CUBE_DIR,
+    SEGMENT_DIR,
+    JOB_TMP
+}
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index 9f1f62d..5121da3 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -64,7 +64,7 @@ public class StorageCleanupJobTest {
         prepareHDFSFiles(basePath, mockFs);
 
         StorageCleanupJob job = new StorageCleanupJob(kylinConfig, mockFs);
-        job.execute(new String[] { "--delete", "true" });
+        job.execute(new String[] { "--delete", "true", "--cleanupThreshold", 
"12" });
 
         ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
         verify(mockFs, times(6)).delete(pathCaptor.capture(), eq(true));

Reply via email to