This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.0-hadoop3.1 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7dbd6281913c7c9701a76fe4e84a4da54d7f5ffe Author: Lijun Cao <> AuthorDate: Tue Sep 4 09:56:36 2018 +0800 KYLIN-3517 Upadate coprocessor on HBase2.0 is avaliable. Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../hbase/lookup/LookupTableToHFileJob.java | 24 +++++------ .../kylin/storage/hbase/steps/CubeHTableUtil.java | 46 +++++++++++----------- .../storage/hbase/util/DeployCoprocessorCLI.java | 46 +++++++++++----------- 3 files changed, 60 insertions(+), 56 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java index 054e146..2789401 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java @@ -26,12 +26,12 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -207,24 +207,24 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { String hTableName = genHTableName(kylinConfig, admin, sourceTableName); TableName tableName = TableName.valueOf(hTableName); - HTableDescriptor hTableDesc = new HTableDescriptor(tableName); - hTableDesc.setCompactionEnabled(false); - hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); - hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); - hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); + TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); + descBuilder.setCompactionEnabled(false); + descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); + descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); + descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); String commitInfo = KylinVersion.getGitCommitInfo(); if (!StringUtils.isEmpty(commitInfo)) { - hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo); + descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo); } - HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false); - hTableDesc.addFamily(cf); + ColumnFamilyDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false); + descBuilder.modifyColumnFamily(cf); try { if (shardNum > 1) { - admin.createTable(hTableDesc, getSplitsByShardNum(shardNum)); + admin.createTable(descBuilder.build(), getSplitsByShardNum(shardNum)); } else { - admin.createTable(hTableDesc); + admin.createTable(descBuilder.build()); } } finally { IOUtils.closeQuietly(admin); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java index f006adb..9e3703c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java @@ -23,11 +23,12 @@ import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; import org.apache.hadoop.hbase.security.User; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -59,25 +61,25 @@ public class CubeHTableUtil { CubeDesc cubeDesc = cubeInstance.getDescriptor(); KylinConfig kylinConfig = cubeDesc.getConfig(); - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(cubeSegment.getStorageLocationIdentifier())); - tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); - tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); - tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); + TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(cubeSegment.getStorageLocationIdentifier())); + descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); + descBuilder.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); + descBuilder.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); if (!StringUtils.isEmpty(kylinConfig.getKylinOwner())) { //HTableOwner is the team that provides kylin service - tableDesc.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner()); + descBuilder.setValue(IRealizationConstants.HTableOwner, kylinConfig.getKylinOwner()); } String commitInfo = KylinVersion.getGitCommitInfo(); if (!StringUtils.isEmpty(commitInfo)) { - tableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo); + descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo); } //HTableUser is the cube owner, which will be the "user" - tableDesc.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner()); + descBuilder.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner()); - tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString()); + descBuilder.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString()); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); @@ -86,12 +88,12 @@ public class CubeHTableUtil { try { if (User.isHBaseSecurityEnabled(conf)) { // add coprocessor for bulk load - tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + descBuilder.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); } for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) { - HColumnDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry()); - tableDesc.addFamily(cf); + ColumnFamilyDescriptor cf = createColumnFamily(kylinConfig, cfDesc.getName(), cfDesc.isMemoryHungry()); + descBuilder.setColumnFamily(cf); } if (admin.tableExists(TableName.valueOf(tableName))) { @@ -100,9 +102,9 @@ public class CubeHTableUtil { throw new RuntimeException("HBase table " + tableName + " exists!"); } - DeployCoprocessorCLI.deployCoprocessor(tableDesc); + DeployCoprocessorCLI.deployCoprocessor(descBuilder); - admin.createTable(tableDesc, splitKeys); + admin.createTable(descBuilder.build(), splitKeys); Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { @@ -136,14 +138,14 @@ public class CubeHTableUtil { admin.deleteTable(tableName); } - HTableDescriptor tableDesc = new HTableDescriptor(tableName); - tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); + TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); + descBuilder.setValue(TableDescriptorBuilder.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - tableDesc.addFamily(createColumnFamily(kylinConfig, cfName, false)); + descBuilder.modifyColumnFamily(createColumnFamily(kylinConfig, cfName, false)); logger.info("creating hbase table " + tableName); - admin.createTable(tableDesc, null); + admin.createTable(descBuilder.build(), null); Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { @@ -151,8 +153,8 @@ public class CubeHTableUtil { } } - public static HColumnDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) { - HColumnDescriptor cf = new HColumnDescriptor(cfName); + public static ColumnFamilyDescriptor createColumnFamily(KylinConfig kylinConfig, String cfName, boolean isMemoryHungry) { + ColumnFamilyDescriptorBuilder cf = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cfName)); cf.setMaxVersions(1); if (isMemoryHungry) { @@ -203,7 +205,7 @@ public class CubeHTableUtil { cf.setInMemory(false); cf.setBloomFilterType(BloomType.NONE); cf.setScope(kylinConfig.getHBaseReplicationScope()); - return cf; + return cf.build(); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 46363b2..362a105 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -41,11 +41,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; @@ -178,7 +179,7 @@ public class DeployCoprocessorCLI { } logger.info("Commit Information: " + commitInfo); for (String tableName : tableNames) { - HTableDescriptor tableDesc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + TableDescriptor tableDesc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName)); String gitTag = tableDesc.getValue(IRealizationConstants.HTableGitTag); if (commitInfo.equals(gitTag)) { filteredList.add(tableName); @@ -249,18 +250,18 @@ public class DeployCoprocessorCLI { return result; } - public static void deployCoprocessor(HTableDescriptor tableDesc) { + public static void deployCoprocessor(TableDescriptorBuilder desBuilder) { try { - initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); + initHTableCoprocessor(desBuilder); + logger.info("hbase table " + desBuilder.build().getTableName() + " deployed with coprocessor."); } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); + logger.error("Error deploying coprocessor on " + desBuilder.build().getTableName(), ex); logger.error("Will try creating the table without coprocessor."); } } - private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { + private static void initHTableCoprocessor(TableDescriptorBuilder descBuilder) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); @@ -268,17 +269,18 @@ public class DeployCoprocessorCLI { String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null); - DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar); + DeployCoprocessorCLI.addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar); } - public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException { - logger.info("Add coprocessor on " + desc.getNameAsString()); - desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null); + public static void addCoprocessorOnHTable(TableDescriptorBuilder descBuilder, Path hdfsCoprocessorJar) throws IOException { + logger.info("Add coprocessor on " + descBuilder.build().getTableName().toString()); + descBuilder.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null); } public static boolean resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + TableDescriptor desc = hbaseAdmin.getDescriptor(TableName.valueOf(tableName)); + TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(desc); //when the table has migrated from dev env to test(prod) env, the dev server //should not reset the coprocessor of the table. @@ -294,30 +296,30 @@ public class DeployCoprocessorCLI { hbaseAdmin.disableTable(TableName.valueOf(tableName)); while (desc.hasCoprocessor(CubeObserverClassOld2)) { - desc.removeCoprocessor(CubeObserverClassOld2); + desc = descBuilder.removeCoprocessor(CubeObserverClassOld2).build(); } while (desc.hasCoprocessor(CubeEndpointClass)) { - desc.removeCoprocessor(CubeEndpointClass); + desc = descBuilder.removeCoprocessor(CubeEndpointClass).build(); } while (desc.hasCoprocessor(IIEndpointClass)) { - desc.removeCoprocessor(IIEndpointClass); + desc = descBuilder.removeCoprocessor(IIEndpointClass).build(); } // remove legacy coprocessor from v1.x while (desc.hasCoprocessor(CubeObserverClassOld)) { - desc.removeCoprocessor(CubeObserverClassOld); + desc = descBuilder.removeCoprocessor(CubeObserverClassOld).build(); } while (desc.hasCoprocessor(IIEndpointClassOld)) { - desc.removeCoprocessor(IIEndpointClassOld); + desc = descBuilder.removeCoprocessor(IIEndpointClassOld).build(); } - addCoprocessorOnHTable(desc, hdfsCoprocessorJar); + addCoprocessorOnHTable(descBuilder, hdfsCoprocessorJar); // update commit tags String commitInfo = KylinVersion.getGitCommitInfo(); if (!StringUtils.isEmpty(commitInfo)) { - desc.setValue(IRealizationConstants.HTableGitTag, commitInfo); + descBuilder.setValue(IRealizationConstants.HTableGitTag, commitInfo); } - hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.modifyTable(descBuilder.build()); logger.info("Enable " + tableName); hbaseAdmin.enableTable(TableName.valueOf(tableName)); @@ -490,9 +492,9 @@ public class DeployCoprocessorCLI { HashSet<String> result = new HashSet<String>(); for (String tableName : tableNames) { - HTableDescriptor tableDescriptor = null; + TableDescriptor tableDescriptor = null; try { - tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + tableDescriptor = hbaseAdmin.getDescriptor(TableName.valueOf(tableName)); } catch (TableNotFoundException e) { logger.warn("Table not found " + tableName, e); continue;