http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 36adca1..0073e07 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -145,7 +145,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (shardLength == 0) { return; } - byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey(); + byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey(); Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } @@ -181,7 +181,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { this.serviceStartTime = System.currentTimeMillis(); - region = env.getRegion(); + region = (HRegion)env.getRegion(); region.startRegionOperation(); // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env.
http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java index 9b487a7..4a4f2a3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java @@ -25,7 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -78,7 +79,8 @@ public class CubeHTableUtil { tableDesc.setValue(IRealizationConstants.HTableSegmentTag, cubeSegment.toString()); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); + Admin admin = conn.getAdmin(); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -91,7 +93,7 @@ public class CubeHTableUtil { tableDesc.addFamily(cf); } - if (admin.tableExists(tableName)) { + if (admin.tableExists(TableName.valueOf(tableName))) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); @@ -100,7 +102,7 @@ public class CubeHTableUtil { DeployCoprocessorCLI.deployCoprocessor(tableDesc); admin.createTable(tableDesc, splitKeys); - Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons"); + Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } finally { admin.close(); @@ -109,8 +111,7 @@ public class CubeHTableUtil { } public static void deleteHTable(TableName tableName) throws IOException { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin(); try { if (admin.tableExists(tableName)) { logger.info("disabling hbase table " + tableName); @@ -125,8 +126,7 @@ public class CubeHTableUtil { /** create a HTable that has the same performance settings as normal cube table, for benchmark purpose */ public static void createBenchmarkHTable(TableName tableName, String cfName) throws IOException { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getAdmin(); try { if (admin.tableExists(tableName)) { logger.info("disabling hbase table " + tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java index 7aecd7e..9dc9715 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java @@ -28,9 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -99,19 +100,21 @@ public class DeprecatedGCStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + admin = conn.getAdmin(); + for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index 4fe7748..16955dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -38,7 +38,7 @@ import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; @@ -64,7 +64,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private final List<KeyValueCreator> keyValueCreators; private final int nColumns; - private final HTableInterface hTable; + private final Table hTable; private final CubeDesc cubeDesc; private final CubeSegment cubeSegment; private final Object[] measureValues; @@ -73,7 +73,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private AbstractRowKeyEncoder rowKeyEncoder; private byte[] keybuf; - public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) { + public HBaseCuboidWriter(CubeSegment segment, Table hTable) { this.keyValueCreators = Lists.newArrayList(); this.cubeSegment = segment; this.cubeDesc = cubeSegment.getCubeDesc(); @@ -132,11 +132,12 @@ public class HBaseCuboidWriter implements ICuboidWriter { long t = System.currentTimeMillis(); if (hTable != null) { hTable.put(puts); - hTable.flushCommits(); } logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); } + logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); + puts.clear(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 9adaf24..e1e2cba 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; - final HTableInterface hTable; + final Table hTable; hTable = createHTable(cubeSegment); List<ICuboidWriter> cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } - private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { + private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); - final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); + final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + admin = conn.getAdmin(); + for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped htable: " + table); output.append("HBase table " + table + " is dropped. \n"); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java index a150607..81a6844 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java @@ -21,11 +21,13 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -38,8 +40,8 @@ public class CleanHtableCLI extends AbstractApplication { protected static final Logger logger = LoggerFactory.getLogger(CleanHtableCLI.class); private void clean() throws IOException { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); @@ -50,7 +52,7 @@ public class CleanHtableCLI extends AbstractApplication { System.out.println(); descriptor.setValue(IRealizationConstants.HTableOwner, "dl-ebay-ky...@ebay.com"); - hbaseAdmin.modifyTable(descriptor.getNameAsString(), descriptor); + hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor); } } hbaseAdmin.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 3066fb5..49d48c3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -47,7 +46,6 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -86,7 +84,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private static Admin hbaseAdmin; public static final String ACL_INFO_FAMILY = "i"; private static final String ACL_TABLE_NAME = "_acl"; @@ -130,8 +128,8 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(srcConfig.getStorageUrl()); + hbaseAdmin = conn.getAdmin(); hdfsFS = FileSystem.get(new Configuration()); @@ -156,6 +154,10 @@ public class CubeMigrationCLI { } else { showOpts(); } + + checkMigrationSuccess(dstConfig, cubeName, true); + + IOUtils.closeQuietly(hbaseAdmin); } public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { @@ -321,177 +323,172 @@ public class CubeMigrationCLI { logger.info("Executing operation: " + opt.toString()); switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - logger.info("CHANGE_HTABLE_HOST is completed"); - break; - } - case COPY_FILE_IN_META: { - String item = (String) opt.params[0]; - RawResource res = srcStore.getResource(item); - dstStore.putResource(item, res.inputStream, res.timestamp); - res.inputStream.close(); - logger.info("Item " + item + " is copied"); - break; - } - case COPY_DICT_OR_SNAPSHOT: { - String item = (String) opt.params[0]; - - if (item.toLowerCase().endsWith(".dict")) { - DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig); - DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig); - DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item); - - long ts = dictSrc.getLastModified(); - dictSrc.setLastModified(0);//to avoid resource store write conflict - Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig); - DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc); - dictSrc.setLastModified(ts); - - if (dictSaved == dictSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); - } else { - //dictSrc is rejected because of duplication - //modify cube's dictionary path - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(dictSaved.getResourcePath()); + case CHANGE_HTABLE_HOST: { + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); + desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); + logger.info("CHANGE_HTABLE_HOST is completed"); + break; + } + case COPY_FILE_IN_META: { + String item = (String) opt.params[0]; + RawResource res = srcStore.getResource(item); + dstStore.putResource(item, res.inputStream, res.timestamp); + res.inputStream.close(); + logger.info("Item " + item + " is copied"); + break; + } + case COPY_DICT_OR_SNAPSHOT: { + String item = (String) opt.params[0]; + + if (item.toLowerCase().endsWith(".dict")) { + DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig); + DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig); + DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item); + + long ts = dictSrc.getLastModified(); + dictSrc.setLastModified(0);//to avoid resource store write conflict + DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc); + dictSrc.setLastModified(ts); + + if (dictSaved == dictSrc) { + //no dup found, already saved to dest + logger.info("Item " + item + " is copied"); + } else { + //dictSrc is rejected because of duplication + //modify cube's dictionary path + String cubeName = (String) opt.params[1]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + for (CubeSegment segment : cube.getSegments()) { + for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { + if (entry.getValue().equalsIgnoreCase(item)) { + entry.setValue(dictSaved.getResourcePath()); + } } } + dstStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused"); } - dstStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused"); - } - - } else if (item.toLowerCase().endsWith(".snapshot")) { - SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig); - SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig); - SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item); - - long ts = snapSrc.getLastModified(); - snapSrc.setLastModified(0); - SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc); - snapSrc.setLastModified(ts); - - if (snapSaved == snapSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); - - } else { - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(snapSaved.getResourcePath()); + } else if (item.toLowerCase().endsWith(".snapshot")) { + SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig); + SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig); + SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item); + + long ts = snapSrc.getLastModified(); + snapSrc.setLastModified(0); + SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc); + snapSrc.setLastModified(ts); + + if (snapSaved == snapSrc) { + //no dup found, already saved to dest + logger.info("Item " + item + " is copied"); + + } else { + String cubeName = (String) opt.params[1]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + for (CubeSegment segment : cube.getSegments()) { + for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) { + if (entry.getValue().equalsIgnoreCase(item)) { + entry.setValue(snapSaved.getResourcePath()); + } } } + dstStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused"); + } - dstStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused"); + } else { + logger.error("unknown item found: " + item); + logger.info("ignore it"); } - - } else { - logger.error("unknown item found: " + item); - logger.info("ignore it"); + break; } - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[0]; - String dstPath = (String) opt.params[1]; - renameHDFSPath(srcPath, dstPath); - logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); - break; - } - case ADD_INTO_PROJECT: { - CubeInstance srcCube = (CubeInstance) opt.params[0]; - String cubeName = (String) opt.params[1]; - String projectName = (String) opt.params[2]; - String modelName = srcCube.getDescriptor().getModelName(); - - String projectResPath = ProjectInstance.concatResourcePath(projectName); - Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); - ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); - - project.addModel(modelName); - project.removeRealization(RealizationType.CUBE, cubeName); - project.addRealizationEntry(RealizationType.CUBE, cubeName); - - dstStore.putResource(projectResPath, project, projectSerializer); - logger.info("Project instance for " + projectName + " is corrected"); - break; - } - case COPY_ACL: { - String cubeId = (String) opt.params[0]; - String modelId = (String) opt.params[1]; - String projectName = (String) opt.params[2]; - String projectResPath = ProjectInstance.concatResourcePath(projectName); - Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); - ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); - String projUUID = project.getUuid(); - HTableInterface srcAclHtable = null; - HTableInterface destAclHtable = null; - try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); - - // cube acl - Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); - if (result.listCells() != null) { - for (Cell cell : result.listCells()) { - byte[] family = CellUtil.cloneFamily(cell); - byte[] column = CellUtil.cloneQualifier(cell); - byte[] value = CellUtil.cloneValue(cell); - - // use the target project uuid as the parent - if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { - String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; - value = Bytes.toBytes(valueString); + case RENAME_FOLDER_IN_HDFS: { + String srcPath = (String) opt.params[0]; + String dstPath = (String) opt.params[1]; + hdfsFS.rename(new Path(srcPath), new Path(dstPath)); + logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); + break; + } + case ADD_INTO_PROJECT: { + String cubeName = (String) opt.params[0]; + String projectName = (String) opt.params[1]; + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); + project.removeRealization(RealizationType.CUBE, cubeName); + project.addRealizationEntry(RealizationType.CUBE, cubeName); + dstStore.putResource(projectResPath, project, projectSerializer); + logger.info("Project instance for " + projectName + " is corrected"); + break; + } + case COPY_ACL: { + String cubeId = (String) opt.params[0]; + String modelId = (String) opt.params[1]; + String projectName = (String) opt.params[2]; + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); + String projUUID = project.getUuid(); + Table srcAclHtable = null; + Table destAclHtable = null; + try { + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + + // cube acl + Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); + if (result.listCells() != null) { + for (Cell cell : result.listCells()) { + byte[] family = CellUtil.cloneFamily(cell); + byte[] column = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + + // use the target project uuid as the parent + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + value = Bytes.toBytes(valueString); + } + Put put = new Put(Bytes.toBytes(cubeId)); + put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); + destAclHtable.put(put); } Put put = new Put(Bytes.toBytes(cubeId)); put.add(family, column, value); destAclHtable.put(put); } + destAclHtable.flushCommits(); + } finally { + IOUtils.closeQuietly(srcAclHtable); + IOUtils.closeQuietly(destAclHtable); } - destAclHtable.flushCommits(); - } finally { - IOUtils.closeQuietly(srcAclHtable); - IOUtils.closeQuietly(destAclHtable); + break; + } + case PURGE_AND_DISABLE: { + String cubeName = (String) opt.params[0]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + cube.getSegments().clear(); + cube.setStatus(RealizationStatusEnum.DISABLED); + srcStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); + + break; + } + default: { + //do nothing + break; } - break; - } - case PURGE_AND_DISABLE: { - String cubeName = (String) opt.params[0]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); - cube.getSegments().clear(); - cube.setStatus(RealizationStatusEnum.DISABLED); - srcStore.putResource(cubeResPath, cube, cubeSerializer); - logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); - - break; - } - default: { - //do nothing - break; - } } } @@ -499,35 +496,35 @@ public class CubeMigrationCLI { logger.info("Undo operation: " + opt.toString()); switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - break; - } - case COPY_FILE_IN_META: { - // no harm - logger.info("Undo for COPY_FILE_IN_META is ignored"); - break; - } - case COPY_DICT_OR_SNAPSHOT: { - // no harm - logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[1]; - String dstPath = (String) opt.params[0]; + case CHANGE_HTABLE_HOST: { + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); + desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); + break; + } + case COPY_FILE_IN_META: { + // no harm + logger.info("Undo for COPY_FILE_IN_META is ignored"); + break; + } + case COPY_DICT_OR_SNAPSHOT: { + // no harm + logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); + break; + } + case RENAME_FOLDER_IN_HDFS: { + String srcPath = (String) opt.params[1]; + String dstPath = (String) opt.params[0]; if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) { renameHDFSPath(srcPath, dstPath); logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); } break; - } + } case ADD_INTO_PROJECT: { logger.info("Undo for ADD_INTO_PROJECT is ignored"); break; http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java index 295750a..f6a28ba 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java @@ -26,10 +26,10 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; @@ -60,7 +60,7 @@ public class CubeMigrationCheckCLI { private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube"); private KylinConfig dstCfg; - private HBaseAdmin hbaseAdmin; + private Admin hbaseAdmin; private List<String> issueExistHTables; private List<String> inconsistentHTables; @@ -128,9 +128,8 @@ public class CubeMigrationCheckCLI { this.dstCfg = kylinConfig; this.ifFix = isFix; - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - hbaseAdmin = new HBaseAdmin(conf); - + Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); + hbaseAdmin = conn.getAdmin(); issueExistHTables = Lists.newArrayList(); inconsistentHTables = Lists.newArrayList(); } @@ -186,11 +185,11 @@ public class CubeMigrationCheckCLI { for (String segFullName : inconsistentHTables) { String[] sepNameList = segFullName.split(","); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); - logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(sepNameList[0]); + logger.info("Change the host of htable "+sepNameList[0]+"belonging to cube "+sepNameList[1]+" from "+desc.getValue(IRealizationConstants.HTableTag)+" to "+dstCfg.getMetadataUrlPrefix()); + hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0])); desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(sepNameList[0], desc); - hbaseAdmin.enableTable(sepNameList[0]); + hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc); + hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0])); } } else { logger.info("------ Inconsistent HTables Needed To Be Fixed ------"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index a1193e7..442d86c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -40,7 +40,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; @@ -76,7 +78,8 @@ public class DeployCoprocessorCLI { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); + Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String localCoprocessorJar; if ("default".equals(args[0])) { @@ -159,10 +162,10 @@ public class DeployCoprocessorCLI { public static void deployCoprocessor(HTableDescriptor tableDesc) { try { initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); + logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); + logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); logger.error("Will try creating the table without coprocessor."); } } @@ -184,9 +187,9 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null); } - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { logger.info("Disable " + tableName); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); logger.info("Unset coprocessor on " + tableName); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); @@ -214,13 +217,13 @@ public class DeployCoprocessorCLI { desc.setValue(IRealizationConstants.HTableGitTag, commitInfo); } - hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); logger.info("Enable " + tableName); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); } - private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { + private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { List<String> processed = new ArrayList<String>(); for (String tableName : tableNames) { @@ -331,7 +334,7 @@ public class DeployCoprocessorCLI { return coprocessorDir; } - private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException { + private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException { HashSet<String> result = new HashSet<String>(); for (String tableName : tableNames) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index a5a85fa..d830276 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -26,8 +26,9 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.KylinConfig; @@ -231,9 +232,9 @@ public class ExtendCubeToHybridCLI { Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - HTableInterface aclHtable = null; + Table aclHtable = null; try { - aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl"); + aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl")); // cube acl Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId))); @@ -253,7 +254,6 @@ public class ExtendCubeToHybridCLI { aclHtable.put(put); } } - aclHtable.flushCommits(); } finally { IOUtils.closeQuietly(aclHtable); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index 86ba22f..8387792 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -28,9 +28,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -75,7 +75,7 @@ public class GridTableHBaseBenchmark { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - HConnection conn = HBaseConnection.get(hbaseUrl); + Connection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); prepareData(conn); @@ -91,10 +91,10 @@ public class GridTableHBaseBenchmark { } - private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException { + private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException { Stats stats = new Stats("COLUMN_SCAN"); - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -122,20 +122,20 @@ public class GridTableHBaseBenchmark { } } - private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException { fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); } - private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); } - private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); } - private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -156,11 +156,11 @@ public class GridTableHBaseBenchmark { } } - private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { + private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException { final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -204,8 +204,8 @@ public class GridTableHBaseBenchmark { } } - private static void prepareData(HConnection conn) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void prepareData(Connection conn) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { // check how many rows existing @@ -258,8 +258,8 @@ public class GridTableHBaseBenchmark { return bytes; } - private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { boolean tableExist = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java index 6749d6c..54a2ac1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java @@ -29,6 +29,13 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -55,8 +62,8 @@ public class HBaseClean extends AbstractApplication { private void cleanUp() { try { // get all kylin hbase tables - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = Lists.newArrayList(); @@ -71,12 +78,12 @@ public class HBaseClean extends AbstractApplication { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java index 346c3a2..58aa8fd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java @@ -20,22 +20,11 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.RegionLoad; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kylin.common.util.Pair; import org.slf4j.Logger; @@ -57,30 +46,31 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(HTable table) throws IOException { - this(table, new HBaseAdmin(table.getConfiguration())); - } - - /** Constructor for unit testing */ - HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { + public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException { + Table table = null; + Admin admin = null; try { + table = hbaseConnection.getTable(TableName.valueOf(tableName)); + admin = hbaseConnection.getAdmin(); + if (!enabled(table.getConfiguration())) { logger.info("Region size calculation disabled."); return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table \"" + table.getName() + "\"."); // Get regions for table. - Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); + List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation hRegionLocation : regionLocationList) { + tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); } - ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); + ClusterStatus clusterStatus = admin.getClusterStatus(); Collection<ServerName> servers = clusterStatus.getServers(); final long megaByte = 1024L * 1024L; @@ -104,7 +94,7 @@ public class HBaseRegionSizeCalculator { } } } finally { - hBaseAdmin.close(); + admin.close(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java index 266f7e7..1351492 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java @@ -25,12 +25,16 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.storage.hbase.HBaseConnection; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.kylin.storage.hbase.HBaseConnection; public class HBaseUsage { @@ -42,8 +46,8 @@ public class HBaseUsage { Map<String, List<String>> envs = Maps.newHashMap(); // get all kylin hbase tables - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); for (HTableDescriptor desc : tableDescriptors) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java index f30f2c9..1257932 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java @@ -31,15 +31,16 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; @@ -57,11 +58,11 @@ public class HbaseStreamingInput { private static final byte[] QN = "C".getBytes(); public static void createTable(String tableName) throws IOException { - HConnection conn = getConnection(); - HBaseAdmin hadmin = new HBaseAdmin(conn); + Connection conn = getConnection(); + Admin hadmin = conn.getAdmin(); try { - boolean tableExist = hadmin.tableExists(tableName); + boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName)); if (tableExist) { logger.info("HTable '" + tableName + "' already exists"); return; @@ -118,8 +119,8 @@ public class HbaseStreamingInput { e.printStackTrace(); } - HConnection conn = getConnection(); - HTableInterface table = conn.getTable(tableName); + Connection conn = getConnection(); + Table table = conn.getTable(TableName.valueOf(tableName)); byte[] key = new byte[8 + 4];//time + id @@ -134,7 +135,7 @@ public class HbaseStreamingInput { Bytes.putInt(key, 8, i); Put put = new Put(key); byte[] cell = randomBytes(CELL_SIZE); - put.add(CF, QN, cell); + put.addColumn(CF, QN, cell); buffer.add(put); } table.put(buffer); @@ -169,8 +170,8 @@ public class HbaseStreamingInput { } Random r = new Random(); - HConnection conn = getConnection(); - HTableInterface table = conn.getTable(tableName); + Connection conn = getConnection(); + Table table = conn.getTable(TableName.valueOf(tableName)); long leftBound = getFirstKeyTime(table); long rightBound = System.currentTimeMillis(); @@ -205,7 +206,7 @@ public class HbaseStreamingInput { } } - private static long getFirstKeyTime(HTableInterface table) throws IOException { + private static long getFirstKeyTime(Table table) throws IOException { long startTime = 0; Scan scan = new Scan(); @@ -223,8 +224,8 @@ public class HbaseStreamingInput { } - private static HConnection getConnection() throws IOException { - return HConnectionManager.createConnection(HBaseConnection.getCurrentHBaseConfiguration()); + private static Connection getConnection() throws IOException { + return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); } private static String formatTime(long time) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java index ca1a060..d2b4ff3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java @@ -30,6 +30,12 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +56,8 @@ public class HtableAlterMetadataCLI extends AbstractApplication { String metadataValue; private void alter() throws IOException { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(table.getTableName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java index 8ff5b0f..1ee3f99 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java @@ -33,7 +33,15 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +62,8 @@ public class OrphanHBaseCleanJob extends AbstractApplication { private void cleanUnusedHBaseTables(Configuration conf) throws IOException { // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -73,12 +82,12 @@ public class OrphanHBaseCleanJob extends AbstractApplication { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index 58ef7cb..b86b561 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -21,9 +21,10 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -57,12 +58,12 @@ public class PingHBaseCLI { Scan scan = new Scan(); int limit = 20; - HConnection conn = null; - HTableInterface table = null; + Connection conn = null; + Table table = null; ResultScanner scanner = null; try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); + conn = ConnectionFactory.createConnection(hconf); + table = conn.getTable(TableName.valueOf(hbaseTable)); scanner = table.getScanner(scan); int count = 0; for (Result r : scanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index 01edb1f..a854973 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -22,9 +22,10 @@ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.Bytes; @@ -70,8 +71,8 @@ public class RowCounterCLI { logger.info("My Scan " + scan.toString()); - HConnection conn = HConnectionManager.createConnection(conf); - HTableInterface tableInterface = conn.getTable(htableName); + Connection conn = ConnectionFactory.createConnection(conf); + Table tableInterface = conn.getTable(TableName.valueOf(htableName)); Iterator<Result> iterator = tableInterface.getScanner(scan).iterator(); int counter = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index 4bd2c53..b8afea1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -41,6 +41,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.CliCommandExecutor; @@ -54,6 +58,7 @@ import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +77,8 @@ public class StorageCleanupJob extends AbstractApplication { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold(); // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -160,12 +166,12 @@ public class StorageCleanupJob extends AbstractApplication { public Object call() throws Exception { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java index e36f662..ff5694a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java @@ -24,16 +24,20 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,14 +53,15 @@ public class UpdateHTableHostCLI { private List<String> errorMsgs = Lists.newArrayList(); private List<String> htables; - private HBaseAdmin hbaseAdmin; + private Admin hbaseAdmin; private KylinConfig kylinConfig; private String oldHostValue; public UpdateHTableHostCLI(List<String> htables, String oldHostValue) throws IOException { this.htables = htables; this.oldHostValue = oldHostValue; - this.hbaseAdmin = new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()); + Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create()); + hbaseAdmin = conn.getAdmin(); this.kylinConfig = KylinConfig.getInstanceFromEnv(); } @@ -166,9 +171,9 @@ public class UpdateHTableHostCLI { HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); if (oldHostValue.equals(desc.getValue(IRealizationConstants.HTableTag))) { desc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(tableName); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); updatedResources.add(tableName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a6735c90/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java index 390930a..7f3acc1 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java @@ -29,12 +29,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.ColumnDesc; @@ -230,15 +228,8 @@ public class AggregateRegionObserverTest { return nextRaw(results); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util - * .List, int) - */ @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -307,6 +298,11 @@ public class AggregateRegionObserverTest { return 0; } + @Override + public int getBatch() { + return 0; + } + /* * (non-Javadoc) * @@ -323,16 +319,9 @@ public class AggregateRegionObserverTest { return i < input.size(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util - * .List, int) - */ @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return nextRaw(result); + public boolean nextRaw(List<Cell> list, ScannerContext scannerContext) throws IOException { + return false; } }