KYLIN-1805: Fix got stuck when deleting HTables during running the StorageCleanupJob
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c4731de6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c4731de6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c4731de6 Branch: refs/heads/stream_m1 Commit: c4731de61faf3806cdba8a312d3ef66feaece14d Parents: 76d60b6 Author: kyotoYaho <nju_y...@apache.org> Authored: Mon Jun 20 12:48:58 2016 +0800 Committer: Zhong <yangzh...@lm-shc-16501214.corp.ebay.com> Committed: Mon Jun 20 12:49:43 2016 +0800 ---------------------------------------------------------------------- .../storage/hbase/util/StorageCleanupJob.java | 52 +++++++++++++++----- 1 file changed, 41 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c4731de6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index a5db52f..ac35ccf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -24,6 +24,7 @@ import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.*; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -62,6 +63,8 @@ public class StorageCleanupJob extends AbstractHadoopJob { protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); + public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute + boolean delete = false; protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -144,19 +147,21 @@ public class StorageCleanupJob extends AbstractHadoopJob { if (delete == true) { // drop tables + ExecutorService executorService = Executors.newSingleThreadExecutor(); for (String htableName : allTablesNeedToBeDropped) { - logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); - } - - hbaseAdmin.deleteTable(htableName); - logger.info("Deleted HBase table " + htableName); - } else { - logger.info("HBase table" + htableName + " does not exist"); + FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName)); + executorService.execute(futureTask); + try { + futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES); + } catch (TimeoutException e) { + logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!"); + futureTask.cancel(true); + } catch (Exception e) { + e.printStackTrace(); + futureTask.cancel(true); } } + executorService.shutdown(); } else { System.out.println("--------------- Tables To Be Dropped ---------------"); for (String htableName : allTablesNeedToBeDropped) { @@ -168,6 +173,31 @@ public class StorageCleanupJob extends AbstractHadoopJob { hbaseAdmin.close(); } + class DeleteHTableRunnable implements Callable { + HBaseAdmin hbaseAdmin; + String htableName; + + DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) { + this.hbaseAdmin = hbaseAdmin; + this.htableName = htableName; + } + + public Object call() throws Exception { + logger.info("Deleting HBase table " + htableName); + if (hbaseAdmin.tableExists(htableName)) { + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); + } + + hbaseAdmin.deleteTable(htableName); + logger.info("Deleted HBase table " + htableName); + } else { + logger.info("HBase table" + htableName + " does not exist"); + } + return null; + } + } + private void cleanUnusedHdfsFiles(Configuration conf) throws IOException { JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -236,7 +266,7 @@ public class StorageCleanupJob extends AbstractHadoopJob { final KylinConfig config = KylinConfig.getInstanceFromEnv(); final CliCommandExecutor cmdExec = config.getCliCommandExecutor(); final int uuidLength = 36; - + final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(useDatabaseHql);