StorageCleanupJob: delete intermediate tables due to kylin.hive.keep.flat.table=true
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/91ffb47f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/91ffb47f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/91ffb47f Branch: refs/heads/yang21-hbase1.x Commit: 91ffb47fa8bcb709c0b1e0857dcdb6094b9d3a08 Parents: 2206974 Author: Hongbin Ma <mahong...@apache.org> Authored: Wed Nov 2 11:13:24 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Wed Nov 2 11:14:14 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/tool/StorageCleanupJob.java | 61 ++++++++++++++++---- 1 file changed, 49 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/91ffb47f/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index 56681af..2a2d1f3 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -21,6 +21,7 @@ package org.apache.kylin.tool; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -49,6 +50,7 @@ import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; @@ -63,6 +65,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; public class StorageCleanupJob extends AbstractApplication { @@ -252,6 +255,7 @@ public class StorageCleanupJob extends AbstractApplication { private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception { final KylinConfig config = KylinConfig.getInstanceFromEnv(); + JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); final CliCommandExecutor cmdExec = config.getCliCommandExecutor(); final int uuidLength = 36; final String preFix = "kylin_intermediate_"; @@ -269,6 +273,7 @@ public class StorageCleanupJob extends AbstractApplication { List<String> allJobs = executableManager.getAllJobIds(); List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>(); List<String> workingJobList = new ArrayList<String>(); + Map<String, String> segmentId2JobId = Maps.newHashMap(); StringBuilder sb = new StringBuilder(); for (String jobId : allJobs) { @@ -278,6 +283,11 @@ public class StorageCleanupJob extends AbstractApplication { workingJobList.add(jobId); sb.append(jobId).append("(").append(state).append("), "); } + + String segmentId = getSegmentIdFromJobId(jobId); + if (segmentId != null) {//some jobs are not cubing jobs + segmentId2JobId.put(segmentId, jobId); + } } logger.info("Working jobIDs: " + workingJobList); @@ -302,15 +312,15 @@ public class StorageCleanupJob extends AbstractApplication { if (UUId_PATTERN.matcher(uuid).matches()) { //Check whether it's a hive table in use if (isTableInUse(uuid, workingJobList)) { - logger.info("Skip because not isTableInUse"); + logger.info("Skip deleting because the table is in use"); isNeedDel = false; } } else { - logger.info("Skip because not match pattern"); + logger.info("Skip deleting because not match pattern"); isNeedDel = false; } } else { - logger.info("Skip because length not qualified"); + logger.info("Skip deleting because length not qualified"); isNeedDel = false; } @@ -320,19 +330,41 @@ public class StorageCleanupJob extends AbstractApplication { } if (delete == true) { - final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(useDatabaseHql); - for (String delHive : allHiveTablesNeedToBeDeleted) { - hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; "); - logger.info("Remove " + delHive + " from hive tables."); - } try { + final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(useDatabaseHql); + for (String delHive : allHiveTablesNeedToBeDeleted) { + hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; "); + logger.info("Remove " + delHive + " from hive tables."); + } cmdExec.execute(hiveCmdBuilder.build()); + + //if kylin.hive.keep.flat.table, some intermediate table might be kept + //delete external path + for (String tableToDelete : allHiveTablesNeedToBeDeleted) { + String uuid = tableToDelete.substring(tableToDelete.length() - uuidLength, tableToDelete.length()); + String segmentId = uuid.replace("_", "-"); + + if (segmentId2JobId.containsKey(segmentId)) { + String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), segmentId2JobId.get(segmentId)) + "/" + tableToDelete; + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + logger.info("Hive table {}'s external path {} deleted", tableToDelete, path); + } else { + logger.info("Hive table {}'s external path {} not exist. It's normal if kylin.hive.keep.flat.table set false (By default)", tableToDelete, path); + } + } else { + logger.warn("Hive table {}'s job ID not found, segmentId2JobId: {}", tableToDelete, segmentId2JobId.toString()); + } + } } catch (IOException e) { e.printStackTrace(); } + } else { System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); for (String hiveTable : allHiveTablesNeedToBeDeleted) { @@ -342,10 +374,15 @@ public class StorageCleanupJob extends AbstractApplication { } } + private String getSegmentIdFromJobId(String jobId) { + AbstractExecutable abstractExecutable = executableManager.getJob(jobId); + String segmentId = abstractExecutable.getParam("segmentId"); + return segmentId; + } + private boolean isTableInUse(String segUuid, List<String> workingJobList) { for (String jobId : workingJobList) { - AbstractExecutable abstractExecutable = executableManager.getJob(jobId); - String segmentId = abstractExecutable.getParam("segmentId"); + String segmentId = getSegmentIdFromJobId(jobId); if (null == segmentId) continue;