KYLIN-2145 StorageCleanupJob will fail when beeline enabled
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cad3def9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cad3def9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cad3def9 Branch: refs/heads/KYLIN-1971 Commit: cad3def9e21fcfc07767e3a1b72f5b1bf8cae3de Parents: 7358a78 Author: Hongbin Ma <mahong...@apache.org> Authored: Tue Nov 1 18:32:31 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Nov 3 13:34:12 2016 +0800 ---------------------------------------------------------------------- tool/pom.xml | 4 ++ .../apache/kylin/tool/StorageCleanupJob.java | 44 ++++++++++---------- 2 files changed, 25 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cad3def9/tool/pom.xml ---------------------------------------------------------------------- diff --git a/tool/pom.xml b/tool/pom.xml index e530469..e3d7bfa 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -42,6 +42,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-source-kafka</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-source-hive</artifactId> + </dependency> <!--Env--> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/cad3def9/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index c1ff753..56681af 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -18,9 +18,7 @@ package org.apache.kylin.tool; -import java.io.BufferedReader; import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -31,6 +29,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; +import javax.annotation.Nullable; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -46,7 +46,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -56,10 +55,15 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.util.HiveCmdBuilder; +import org.apache.kylin.source.hive.HiveClientFactory; +import org.apache.kylin.source.hive.HiveCmdBuilder; +import org.apache.kylin.source.hive.IHiveClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + public class StorageCleanupJob extends AbstractApplication { @SuppressWarnings("static-access") @@ -244,27 +248,24 @@ public class StorageCleanupJob extends AbstractApplication { } System.out.println("-------------------------------------------------------"); } - } - private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { + private void cleanUnusedIntermediateHiveTable(Configuration conf) throws Exception { final KylinConfig config = KylinConfig.getInstanceFromEnv(); final CliCommandExecutor cmdExec = config.getCliCommandExecutor(); final int uuidLength = 36; final String preFix = "kylin_intermediate_"; final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; - - final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(useDatabaseHql); - hiveCmdBuilder.addStatement("show tables " + "\'kylin_intermediate_*\'" + "; "); - - Pair<Integer, String> result = cmdExec.execute(hiveCmdBuilder.build()); + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + List<String> hiveTableNames = hiveClient.getHiveTableNames(config.getHiveDatabaseForIntermediateTable()); + Iterable<String> kylinIntermediates = Iterables.filter(hiveTableNames, new Predicate<String>() { + @Override + public boolean apply(@Nullable String input) { + return input != null && input.startsWith("kylin_intermediate_"); + } + }); - String outputStr = result.getSecond(); - BufferedReader reader = new BufferedReader(new StringReader(outputStr)); - String line = null; List<String> allJobs = executableManager.getAllJobIds(); List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>(); List<String> workingJobList = new ArrayList<String>(); @@ -280,15 +281,14 @@ public class StorageCleanupJob extends AbstractApplication { } logger.info("Working jobIDs: " + workingJobList); - while ((line = reader.readLine()) != null) { - + for (String line : kylinIntermediates) { logger.info("Checking table " + line); if (!line.startsWith(preFix)) continue; if (force == true) { - logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!"); + logger.warn("Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!"); allHiveTablesNeedToBeDeleted.add(line); continue; } @@ -320,7 +320,8 @@ public class StorageCleanupJob extends AbstractApplication { } if (delete == true) { - hiveCmdBuilder.reset(); + final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(useDatabaseHql); for (String delHive : allHiveTablesNeedToBeDeleted) { hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; "); @@ -339,9 +340,6 @@ public class StorageCleanupJob extends AbstractApplication { } System.out.println("----------------------------------------------------"); } - - if (reader != null) - reader.close(); } private boolean isTableInUse(String segUuid, List<String> workingJobList) {