This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new a7b7edb Add option to remove stale files under job_tmp for
StorageCleanupJob (#1732)
a7b7edb is described below
commit a7b7edba90e8810bad3b9a76cbf64241da8de4b5
Author: Xiaoxiang Yu <[email protected]>
AuthorDate: Wed Aug 18 12:52:13 2021 +0800
Add option to remove stale files under job_tmp for StorageCleanupJob (#1732)
* Add option to remove stale files under job_tmp for StorageCleanupJob
* fix code style
* fix cleanupJobTmp
* fix sample.sh
* fix kylin.engine.build-base-cuboid-enabled conflict with cube planner
* minor fix
* Fix optimize job
* fix typo
Co-authored-by: yaqian.zhang <[email protected]>
---
.../engine/mr/common/CuboidRecommenderUtil.java | 13 +-
build/conf/spark-driver-log4j.properties | 6 +
.../kylin/common/util/CliCommandExecutor.java | 2 +-
.../src/main/resources/kylin-defaults.properties | 1 +
kylin-it/DEPRECATED_MODULE | 4 +-
.../engine/spark/utils/UpdateMetadataUtil.java | 42 +++----
.../apache/kylin/rest/job/StorageCleanupJob.java | 131 +++++++++++++++------
.../kylin/rest/job/StorageCleanupJobTest.java | 2 +-
8 files changed, 134 insertions(+), 67 deletions(-)
diff --git
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index f6ae332..71d2fd1 100644
---
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -51,8 +51,9 @@ public class CuboidRecommenderUtil {
}
CubeInstance cube = segment.getCubeInstance();
long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
- if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
- || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid)
== 0L) {
+ if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+ || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid)
== 0L)
+ && segment.getConfig().isBuildBaseCuboid()) {
logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
return null;
}
@@ -77,7 +78,8 @@ public class CuboidRecommenderUtil {
Pair<Map<Long, Long>, Map<Long, Double>> statsPair =
CuboidStatsReaderUtil
.readCuboidStatsAndSizeFromCube(currentCuboids, cube);
long baseCuboid = cuboidScheduler.getBaseCuboidId();
- if (statsPair.getFirst().get(baseCuboid) == null ||
statsPair.getFirst().get(baseCuboid) == 0L) {
+ if ((statsPair.getFirst().get(baseCuboid) == null ||
statsPair.getFirst().get(baseCuboid) == 0L)
+ && cube.getConfig().isBuildBaseCuboid()) {
logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
return null;
}
@@ -121,8 +123,9 @@ public class CuboidRecommenderUtil {
}
CubeInstance cube = segment.getCubeInstance();
long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
- if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
- || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid)
== 0L) {
+ if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
+ || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid)
== 0L)
+ && segment.getConfig().isBuildBaseCuboid()) {
logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO);
return null;
}
diff --git a/build/conf/spark-driver-log4j.properties
b/build/conf/spark-driver-log4j.properties
index 04c648c..364a6ed 100644
--- a/build/conf/spark-driver-log4j.properties
+++ b/build/conf/spark-driver-log4j.properties
@@ -22,6 +22,11 @@ log4j.logger.org.apache.kylin=DEBUG
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.spark=WARN
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.stderr.target=System.err
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} :
%m%n
+
# hdfs file appender
log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled}
@@ -36,6 +41,7 @@
log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayou
#Don't add line number (%L) as it's too costly!
log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+## Log saved under $KYLIN_HOME/logs/spark
log4j.appender.logFile=org.apache.log4j.FileAppender
log4j.appender.logFile.Threshold=DEBUG
log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log
diff --git
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
index 23089b0..54bab60 100644
---
a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
+++
b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java
@@ -152,7 +152,7 @@ public class CliCommandExecutor {
}
if (Thread.interrupted()) {
- logger.info("CliCommandExecutor is interruppted by other, kill
the sub process: " + command);
+ logger.info("CliCommandExecutor is interrupted by other, kill
the sub process: " + command);
proc.destroy();
try {
Thread.sleep(1000);
diff --git a/core-common/src/main/resources/kylin-defaults.properties
b/core-common/src/main/resources/kylin-defaults.properties
index 384466f..badfa69 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -83,6 +83,7 @@ kylin.web.link-hadoop=
kylin.web.link-diagnostic=
kylin.web.contact-mail=
kylin.server.external-acl-provider=
+kylin.source.hive.database-for-flat-table=default
# Default time filter for job list, 0->current day, 1->last one day, 2->last
one week, 3->last one year, 4->all
kylin.web.default-time-filter=1
diff --git a/kylin-it/DEPRECATED_MODULE b/kylin-it/DEPRECATED_MODULE
index b593cf2..e3c19f4 100644
--- a/kylin-it/DEPRECATED_MODULE
+++ b/kylin-it/DEPRECATED_MODULE
@@ -16,4 +16,6 @@
# limitations under the License.
#
-This maven module is deprecated and maybe be removed in the future.
\ No newline at end of file
+This maven module is deprecated and maybe be removed in the future.
+
+Remind that the path 'kylin-it/src/test/resources/query' is still be used by
module 'kylin-spark-project/kylin-spark-test' .
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index a02530e..6921444 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -113,30 +113,30 @@ public class UpdateMetadataUtil {
CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg,
config);
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
if (origSegStatsReader.getCuboidRowHLLCounters() == null) {
- throw new IllegalArgumentException(
+ logger.warn(
"Cuboid statistics of original segment do not exist.
Please check the config of kylin.engine.segment-statistics-enabled.");
- }
- addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap);
- addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap);
-
- Set<Long> recommendCuboids =
currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND);
- Map<Long, HLLCounter> resultCuboidHLLMap =
Maps.newHashMapWithExpectedSize(recommendCuboids.size());
- for (long cuboid : recommendCuboids) {
- HLLCounter hll = cuboidHLLMap.get(cuboid);
- if (hll == null) {
- logger.warn("Cannot get the row count stats for cuboid " +
cuboid);
- } else {
- resultCuboidHLLMap.put(cuboid, hll);
+ } else {
+ addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap);
+ addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap);
+
+ Set<Long> recommendCuboids =
currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND);
+ Map<Long, HLLCounter> resultCuboidHLLMap =
Maps.newHashMapWithExpectedSize(recommendCuboids.size());
+ for (long cuboid : recommendCuboids) {
+ HLLCounter hll = cuboidHLLMap.get(cuboid);
+ if (hll == null) {
+ logger.warn("Cannot get the row count stats for cuboid
" + cuboid);
+ } else {
+ resultCuboidHLLMap.put(cuboid, hll);
+ }
}
+ if (fs.exists(statisticsFile)) {
+ fs.delete(statisticsFile, false);
+ }
+
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new
Path(statisticsDir),
+ resultCuboidHLLMap, 1,
origSegStatsReader.getSourceRowCount());
+ FSDataInputStream is = fs.open(statisticsFile);
+ ResourceStore.getStore(config).putBigResource(resKey, is,
System.currentTimeMillis());
}
- if (fs.exists(statisticsFile)) {
- fs.delete(statisticsFile, false);
- }
-
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new
Path(statisticsDir),
- resultCuboidHLLMap, 1,
origSegStatsReader.getSourceRowCount());
- FSDataInputStream is = fs.open(statisticsFile);
- ResourceStore.getStore(config).putBigResource(resKey, is,
System.currentTimeMillis());
-
toUpdateSeg.setStatus(SegmentStatusEnum.READY_PENDING);
} else {
toUpdateSeg.setStatus(SegmentStatusEnum.READY);
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index f4ad269..95a7a48 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -43,43 +43,75 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
+/**
+ * Please update
https://cwiki.apache.org/confluence/display/KYLIN/How+to+clean+up+storage+in+Kylin+4
+ * if you change this class.
+ */
public class StorageCleanupJob extends AbstractApplication {
private static final Logger logger =
LoggerFactory.getLogger(StorageCleanupJob.class);
+ /**
+ * It is considered quite safe to remove job_tmp path which was created 1
week ago .
+ */
+ public static final int DEFAULT_CLEANUP_HOUR_THRESHOLD = 24 * 7;
+ public static final boolean DEFAULT_CLEANUP_DICT = true;
+ public static final boolean DEFAULT_CLEANUP_SNAPSHOT = true;
+ public static final boolean DEFAULT_CLEANUP_JOB_TMP = false;
+ public static final boolean DEFAULT_CLEANUP = false;
+ private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
+ private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/";
+
@SuppressWarnings("static-access")
- protected static final Option OPTION_DELETE =
OptionBuilder.withArgName("delete").hasArg().isRequired(false)
- .withDescription("Delete the unused storage").create("delete");
+ protected static final Option OPTION_DELETE =
OptionBuilder.withArgName("delete")
+ .hasArg().isRequired(false)
+ .withType(Boolean.class.getName())
+ .withDescription("Boolean, whether or not to do real delete
operation. Default value is " + DEFAULT_CLEANUP + ", means a dry run.")
+ .create("delete");
+
@SuppressWarnings("static-access")
protected static final Option OPTION_CLEANUP_TABLE_SNAPSHOT =
OptionBuilder.withArgName("cleanupTableSnapshot")
- .hasArg().isRequired(false).withDescription("Delete the unused
storage").create("cleanupTableSnapshot");
- @SuppressWarnings("static-access")
- protected static final Option OPTION_CLEANUP_GLOBAL_DICT =
OptionBuilder.withArgName("cleanupGlobalDict").hasArg()
- .isRequired(false).withDescription("Delete the unused
storage").create("cleanupGlobalDict");
+ .hasArg().isRequired(false)
+ .withType(Boolean.class.getName())
+ .withDescription("Boolean, whether or not to delete unreferenced
snapshot files. Default value is " + DEFAULT_CLEANUP_SNAPSHOT + " .")
+ .create("cleanupTableSnapshot");
+
@SuppressWarnings("static-access")
- protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR =
OptionBuilder.withArgName("cleanupThreshold").hasArg()
- .isRequired(false).withDescription("Delete unused storage that
have not been modified in how many hours")
- .create("cleanupThreshold");
+ protected static final Option OPTION_CLEANUP_GLOBAL_DICT =
OptionBuilder.withArgName("cleanupGlobalDict")
+ .hasArg().isRequired(false)
+ .withType(Boolean.class.getName())
+ .withDescription("Boolean, whether or not to delete unreferenced
global dict files. Default value is " + DEFAULT_CLEANUP_DICT + " .")
+ .create("cleanupGlobalDict");
- private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
- private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/";
+ @SuppressWarnings("static-access")
+ protected static final Option OPTION_CLEANUP_JOB_TMP =
OptionBuilder.withArgName("cleanupJobTmp")
+ .hasArg().isRequired(false)
+ .withType(Boolean.class.getName())
+ .withDescription("Boolean, whether or not to delete job tmp files.
Default value is " + DEFAULT_CLEANUP_JOB_TMP + " .")
+ .create("cleanupJobTmp");
- private static final String TABLE_SNAPSHOT = "table snapshot";
- private static final String GLOBAL_DICTIONARY = "global dictionary";
- private static final String SEGMENT_PARQUET_FILE = "segment parquet file";
+ @SuppressWarnings("static-access")
+ protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR =
OptionBuilder.withArgName("cleanupThreshold")
+ .hasArg().isRequired(false)
+ .withType(Integer.class.getName())
+ .withDescription(
+ "Integer, used to specific delete unreferenced storage
that have not been modified before how many hours (recent files are protected).
" +
+ "Default value is " + DEFAULT_CLEANUP_HOUR_THRESHOLD + "
hours.")
+ .create("cleanupThreshold");
final protected KylinConfig config;
final protected FileSystem fs;
final protected ExecutableManager executableManager;
- protected boolean delete = false;
- protected boolean cleanupTableSnapshot = true;
- protected boolean cleanupGlobalDict = true;
- protected int cleanupThreshold = 12; // 12 hour
-
+ protected boolean delete = DEFAULT_CLEANUP;
+ protected boolean cleanupTableSnapshot = DEFAULT_CLEANUP_SNAPSHOT;
+ protected boolean cleanupGlobalDict = DEFAULT_CLEANUP_DICT;
+ protected boolean cleanupJobTmp = DEFAULT_CLEANUP;
+ protected int cleanupThreshold = DEFAULT_CLEANUP_HOUR_THRESHOLD;
protected long storageTimeCut;
protected static final List<String> protectedDir =
Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs");
@@ -101,6 +133,7 @@ public class StorageCleanupJob extends AbstractApplication {
options.addOption(OPTION_DELETE);
options.addOption(OPTION_CLEANUP_GLOBAL_DICT);
options.addOption(OPTION_CLEANUP_TABLE_SNAPSHOT);
+ options.addOption(OPTION_CLEANUP_JOB_TMP);
options.addOption(OPTION_CLEANUP_THRESHOLD_HOUR);
return options;
}
@@ -108,13 +141,6 @@ public class StorageCleanupJob extends AbstractApplication
{
@Override
protected void execute(OptionsHelper optionsHelper) throws Exception {
logger.info("options: '" + optionsHelper.getOptionsAsString() + "'");
- logger.info("delete option value: '" +
optionsHelper.getOptionValue(OPTION_DELETE) + "'");
- logger.info("cleanup table snapshot option value: '"
- + optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT)
+ "'");
- logger.info(
- "delete global dict option value: '" +
optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT) + "'");
- logger.info("delete unused storage that have not been modified in how
many hours option value: '"
- + optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR)
+ "'");
delete =
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE));
if (optionsHelper.hasOption(OPTION_CLEANUP_TABLE_SNAPSHOT)) {
cleanupTableSnapshot =
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT));
@@ -122,11 +148,18 @@ public class StorageCleanupJob extends
AbstractApplication {
if (optionsHelper.hasOption(OPTION_CLEANUP_GLOBAL_DICT)) {
cleanupGlobalDict =
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT));
}
+ if (optionsHelper.hasOption(OPTION_CLEANUP_JOB_TMP)) {
+ cleanupJobTmp =
Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_JOB_TMP));
+ }
if (optionsHelper.hasOption(OPTION_CLEANUP_THRESHOLD_HOUR)) {
cleanupThreshold =
Integer.parseInt(optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR));
}
storageTimeCut = System.currentTimeMillis() - cleanupThreshold * 3600
* 1000L;
+ Date cleanBeforeDate = new Date(storageTimeCut);
+
logger.info("===================================================================\n"
+
+ "delete : {}; cleanupTableSnapshot : {};
cleanupGlobalDict : {}; cleanupJobTmp : {}; cleanBeforeDate : {}."
+ , delete, cleanupTableSnapshot, cleanupGlobalDict,
cleanupJobTmp, cleanBeforeDate);
cleanup();
}
@@ -138,7 +171,7 @@ public class StorageCleanupJob extends AbstractApplication {
List<String> projects =
projectManager.listAllProjects().stream().map(ProjectInstance::getName)
.collect(Collectors.toList());
- //clean up deleted projects and cubes
+ logger.info("Start to clean up unreferenced projects and cubes ...");
List<CubeInstance> cubes = cubeManager.listAllCubes();
Path metadataPath = new Path(config.getHdfsWorkingDirectory());
if (fs.exists(metadataPath)) {
@@ -148,7 +181,7 @@ public class StorageCleanupJob extends AbstractApplication {
if (eligibleStorage(status)) {
String projectName = status.getPath().getName();
if (!projects.contains(projectName)) {
- cleanupStorage(status.getPath(),
SEGMENT_PARQUET_FILE);
+ deleteOp(status.getPath(),
StorageCleanType.PROJECT_DIR);
} else {
cleanupGlobalDict(projectName,
cubes.stream().filter(cube ->
projectName.equals(cube.getProject()))
@@ -164,14 +197,14 @@ public class StorageCleanupJob extends
AbstractApplication {
}
}
- //clean up no used segments
+ logger.info("Start to clean up no unreferenced segments ...");
for (CubeInstance cube : cubes) {
List<String> segments = cube.getSegments().stream().map(segment ->
{
return segment.getName() + "_" +
segment.getStorageLocationIdentifier();
}).collect(Collectors.toList());
String project = cube.getProject();
- //list all segment directory
+ // list all segment directory
Path cubePath = new Path(config.getHdfsWorkingDirectory(project) +
"/parquet/" + cube.getName());
if (fs.exists(cubePath)) {
FileStatus[] segmentStatus = fs.listStatus(cubePath);
@@ -180,13 +213,26 @@ public class StorageCleanupJob extends
AbstractApplication {
if (eligibleStorage(status)) {
String segment = status.getPath().getName();
if (!segments.contains(segment)) {
- cleanupStorage(status.getPath(),
SEGMENT_PARQUET_FILE);
+ deleteOp(status.getPath(),
StorageCleanType.SEGMENT_DIR);
}
}
}
}
} else {
- logger.warn("Cube path doesn't exist! The path is " +
cubePath);
+ logger.warn("Cube path doesn't exist! The path is {}",
cubePath);
+ }
+ }
+
+ if (cleanupJobTmp) {
+ logger.info("Start to clean up stale job_tmp ...");
+ for (String prj : projects) {
+ Path prjPath = new Path(config.getJobTmpDir(prj));
+ FileStatus[] jobTmpPaths = fs.listStatus(prjPath);
+ for (FileStatus status : jobTmpPaths) {
+ if (eligibleStorage(status)) {
+ deleteOp(status.getPath(), StorageCleanType.JOB_TMP);
+ }
+ }
}
}
}
@@ -201,7 +247,7 @@ public class StorageCleanupJob extends AbstractApplication {
if (eligibleStorage(status)) {
String cubeName = status.getPath().getName();
if (!cubes.contains(cubeName)) {
- cleanupStorage(status.getPath(),
SEGMENT_PARQUET_FILE);
+ deleteOp(status.getPath(),
StorageCleanType.CUBE_DIR);
}
}
}
@@ -237,7 +283,7 @@ public class StorageCleanupJob extends AbstractApplication {
}
for (Path path : toDeleteSnapshot) {
- cleanupStorage(path, TABLE_SNAPSHOT);
+ deleteOp(path, StorageCleanType.TABLE_SNAPSHOT);
}
}
@@ -279,16 +325,16 @@ public class StorageCleanupJob extends
AbstractApplication {
}
for (Path path : toDeleteDict) {
- cleanupStorage(path, GLOBAL_DICTIONARY);
+ deleteOp(path, StorageCleanType.GLOBAL_DICTIONARY);
}
}
- private void cleanupStorage(Path path, String storageType) throws
IOException {
+ private void deleteOp(Path path, StorageCleanType type) throws IOException
{
if (delete) {
- logger.info("Deleting unused {}, {}", storageType, path);
+ logger.info("Deleting unreferenced {}, {}", type, path);
fs.delete(path, true);
} else {
- logger.info("Dry run, pending delete unused {}, {}", storageType,
path);
+ logger.info("Dry run, pending delete unreferenced path {}, {}",
type, path);
}
}
@@ -296,3 +342,12 @@ public class StorageCleanupJob extends AbstractApplication
{
return status != null && status.getModificationTime() < storageTimeCut;
}
}
+
+enum StorageCleanType {
+ PROJECT_DIR,
+ GLOBAL_DICTIONARY,
+ TABLE_SNAPSHOT,
+ CUBE_DIR,
+ SEGMENT_DIR,
+ JOB_TMP
+}
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index 9f1f62d..5121da3 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -64,7 +64,7 @@ public class StorageCleanupJobTest {
prepareHDFSFiles(basePath, mockFs);
StorageCleanupJob job = new StorageCleanupJob(kylinConfig, mockFs);
- job.execute(new String[] { "--delete", "true" });
+ job.execute(new String[] { "--delete", "true", "--cleanupThreshold",
"12" });
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(mockFs, times(6)).delete(pathCaptor.capture(), eq(true));