Repository: kylin Updated Branches: refs/heads/master fa5c9cb78 -> 858fad676
KYLIN-1828 StorageCleanupJob Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/858fad67 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/858fad67 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/858fad67 Branch: refs/heads/master Commit: 858fad676bb42366c26f4033fcc208675cb4bf72 Parents: fa5c9cb Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Fri Sep 23 17:32:24 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Sep 23 17:44:23 2016 +0800 ---------------------------------------------------------------------- .../storage/hbase/util/StorageCleanupJob.java | 57 ++++++++++++++++---- 1 file changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/858fad67/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index 4bd2c53..dffce36 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -51,6 +52,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -61,11 +63,13 @@ public class StorageCleanupJob extends AbstractApplication { @SuppressWarnings("static-access") protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); + protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force"); protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute protected boolean delete = false; + protected boolean force = false; protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); private void cleanUnusedHBaseTables(Configuration conf) throws IOException { @@ -132,6 +136,7 @@ public class StorageCleanupJob extends AbstractApplication { protected Options getOptions() { Options options = new Options(); options.addOption(OPTION_DELETE); + options.addOption(OPTION_FORCE); return options; } @@ -139,7 +144,9 @@ public class StorageCleanupJob extends AbstractApplication { protected void execute(OptionsHelper optionsHelper) throws Exception { logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'"); + logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'"); delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE)); + force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE)); Configuration conf = HBaseConfiguration.create(); @@ -183,6 +190,7 @@ public class StorageCleanupJob extends AbstractApplication { // GlobFilter filter = new // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() // + "/kylin-.*"); + // TODO: when first use, /kylin/kylin_metadata does not exist. FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())); for (FileStatus status : fStatus) { String path = status.getPath().getName(); @@ -242,6 +250,8 @@ public class StorageCleanupJob extends AbstractApplication { final KylinConfig config = KylinConfig.getInstanceFromEnv(); final CliCommandExecutor cmdExec = config.getCliCommandExecutor(); final int uuidLength = 36; + final String preFix = "kylin_intermediate_"; + final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); @@ -260,7 +270,6 @@ public class StorageCleanupJob extends AbstractApplication { for (String jobId : allJobs) { // only remove FINISHED and DISCARDED job intermediate table final ExecutableState state = executableManager.getOutput(jobId).getState(); - if (!state.isFinalState()) { workingJobList.add(jobId); logger.info("Skip intermediate hive table with job id " + jobId + " with job status " + state); @@ -268,18 +277,35 @@ public class StorageCleanupJob extends AbstractApplication { } while ((line = reader.readLine()) != null) { - if (line.startsWith("kylin_intermediate_")) { - boolean isNeedDel = false; + if (!line.startsWith(preFix)) + continue; + + if (force == true) { + logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!"); + allHiveTablesNeedToBeDeleted.add(line); + continue; + } + + boolean isNeedDel = true; + + if (line.length() > preFix.length() + uuidLength) { String uuid = line.substring(line.length() - uuidLength, line.length()); uuid = uuid.replace("_", "-"); - //Check whether it's a hive table in use - if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) { - isNeedDel = true; + final Pattern UUId_PATTERN = Pattern.compile(uuidPattern); + if (UUId_PATTERN.matcher(uuid).matches()) { + //Check whether it's a hive table in use + if (isTableInUse(uuid, workingJobList)) { + isNeedDel = false; + } + } else { + isNeedDel = false; } + } else { + isNeedDel = false; + } - if (isNeedDel) { - allHiveTablesNeedToBeDeleted.add(line); - } + if (isNeedDel) { + allHiveTablesNeedToBeDeleted.add(line); } } @@ -308,6 +334,19 @@ public class StorageCleanupJob extends AbstractApplication { reader.close(); } + private boolean isTableInUse(String segUuid, List<String> workingJobList) { + for (String jobId : workingJobList) { + AbstractExecutable abstractExecutable = executableManager.getJob(jobId); + String segmentId = abstractExecutable.getParam("segmentId"); + + if (null == segmentId) + continue; + + return segUuid.equals(segmentId); + } + return false; + } + public static void main(String[] args) throws Exception { StorageCleanupJob cli = new StorageCleanupJob(); cli.execute(args);