KYLIN-2109 Deploy coprocessor only this server own the table Signed-off-by: Li Yang <liy...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c7a48dd9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c7a48dd9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c7a48dd9 Branch: refs/heads/KYLIN-1971 Commit: c7a48dd93f7733b6ddf267e8b8f56ed7153a280c Parents: b7e8065 Author: kangkaisen <kangkai...@live.com> Authored: Wed Oct 19 15:45:19 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Oct 20 16:43:51 2016 +0800 ---------------------------------------------------------------------- .../hbase/util/DeployCoprocessorCLI.java | 28 +++++++++++++++----- 1 file changed, 22 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c7a48dd9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index cc9b988..f2618dc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -69,6 +69,8 @@ public class DeployCoprocessorCLI { public static final String CubeObserverClassOld = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver"; public static final String IIEndpointClassOld = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint"; public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint"; + private static KylinConfig kylinConfig; + private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class); public static void main(String[] args) throws IOException { @@ -77,7 +79,7 @@ public class DeployCoprocessorCLI { printUsageAndExit(); } - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); @@ -189,12 +191,22 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null); } - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + public static boolean resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + + //when the table has migrated from dev env to test(prod) env, the dev server + //should not reset the coprocessor of the table. + String host = desc.getValue(IRealizationConstants.HTableTag); + if (!host.equalsIgnoreCase(kylinConfig.getMetadataUrlPrefix())) { + logger.warn("This server doesn't own this table: " + tableName); + return false; + } + + logger.info("reset coprocessor on " + tableName); + logger.info("Disable " + tableName); hbaseAdmin.disableTable(tableName); - logger.info("Unset coprocessor on " + tableName); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); while (desc.hasCoprocessor(CubeObserverClass)) { desc.removeCoprocessor(CubeObserverClass); } @@ -223,6 +235,8 @@ public class DeployCoprocessorCLI { logger.info("Enable " + tableName); hbaseAdmin.enableTable(tableName); + + return true; } private static List<String> processedTables = Collections.synchronizedList(new ArrayList<String>()); @@ -260,8 +274,10 @@ public class DeployCoprocessorCLI { @Override public void run() { try { - resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); - processedTables.add(tableName); + boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); + if (isProcessed) { + processedTables.add(tableName); + } } catch (Exception ex) { logger.error("Error processing " + tableName, ex); } finally {