KYLIN-2089 Make update HBase coprocessor concurrent Signed-off-by: Li Yang <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b7e8065c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b7e8065c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b7e8065c Branch: refs/heads/KYLIN-1971 Commit: b7e8065c0b44eb45ec11fd3b498fd72652782b84 Parents: eef157c Author: kangkaisen <kangkai...@live.com> Authored: Wed Oct 12 20:12:15 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Oct 20 16:43:51 2016 +0800 ---------------------------------------------------------------------- .../hbase/util/DeployCoprocessorCLI.java | 55 ++++++++++++++++---- 1 file changed, 46 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b7e8065c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index a1193e7..cc9b988 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -23,10 +23,14 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import org.apache.commons.io.IOUtils; @@ -107,14 +111,15 @@ public class DeployCoprocessorCLI { Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); + resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); // Don't remove old jars, missing coprocessor jar will fail hbase // removeOldJars(oldJarPaths, fileSystem); hbaseAdmin.close(); - logger.info("Processed " + processedTables); + logger.info("Processed tables count: " + processedTables.size()); + logger.info("Processed tables: " + processedTables); logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); } @@ -220,18 +225,50 @@ public class DeployCoprocessorCLI { hbaseAdmin.enableTable(tableName); } - private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { - List<String> processed = new ArrayList<String>(); - - for (String tableName : tableNames) { + private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); + + private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { + ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); + + for (final String tableName : tableNames) { + coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName)); + } + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + logger.error("reset coprocessor failed: ", e); + } + + coprocessorPool.shutdown(); + } + + private static class ResetCoprocessorWorker implements Runnable { + private final CountDownLatch countDownLatch; + private final HBaseAdmin hbaseAdmin; + private final Path hdfsCoprocessorJar; + private final String tableName; + + public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) { + this.countDownLatch = countDownLatch; + this.hbaseAdmin = hbaseAdmin; + this.hdfsCoprocessorJar = hdfsCoprocessorJar; + this.tableName = tableName; + } + + @Override + public void run() { try { resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); - processed.add(tableName); - } catch (IOException ex) { + processedTables.add(tableName); + } catch (Exception ex) { logger.error("Error processing " + tableName, ex); + } finally { + countDownLatch.countDown(); } + } - return processed; } public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {