merge 1.5.3
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/15c1d3c7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/15c1d3c7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/15c1d3c7 Branch: refs/heads/1.5.x-HBase1.x Commit: 15c1d3c73540381b629da11146a21515e1ee39e2 Parents: c509efb Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Jul 25 20:12:00 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Jul 25 20:12:00 2016 +0800 ---------------------------------------------------------------------- .../kylin/provision/BuildCubeWithEngine.java | 3 +- .../rest/security/MockAclHBaseStorage.java | 4 +- .../apache/kylin/rest/service/CubeService.java | 2 + .../apache/kylin/rest/service/QueryService.java | 7 +- .../apache/kylin/rest/service/UserService.java | 14 +--- .../kylin/storage/hbase/HBaseConnection.java | 74 ++++++++++++++++---- .../kylin/storage/hbase/HBaseResourceStore.java | 23 ++---- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../storage/hbase/steps/HBaseCuboidWriter.java | 1 - .../storage/hbase/util/CubeMigrationCLI.java | 22 ++---- .../storage/hbase/util/StorageCleanupJob.java | 5 +- 11 files changed, 85 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 4cee1ed..2a5979f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -58,6 +58,7 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -419,7 +420,7 @@ public class BuildCubeWithEngine { } private void checkHFilesInHBase(CubeSegment segment) throws IOException { - Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); String tableName = segment.getStorageLocationIdentifier(); HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java index 492c176..16d6f9f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java +++ b/server-base/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java @@ -34,8 +34,8 @@ public class MockAclHBaseStorage implements AclHBaseStorage { private static final String aclTableName = "MOCK-ACL-TABLE"; private static final String userTableName = "MOCK-USER-TABLE"; - private HTableInterface mockedAclTable; - private HTableInterface mockedUserTable; + private Table mockedAclTable; + private Table mockedUserTable; private RealAclHBaseStorage realAcl; public MockAclHBaseStorage() { http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 7f35051..6e3d32d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.WeakHashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; import java.util.*; import org.apache.kylin.common.KylinConfig; @@ -65,6 +66,7 @@ import org.apache.kylin.rest.security.AclPermission; import org.apache.kylin.source.hive.HiveSourceTableLoader; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 783616d..d095f2b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -46,7 +46,6 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -184,14 +183,10 @@ public class QueryService extends BasicService { List<Query> queries = new ArrayList<Query>(); Table htable = null; try { -<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java - HConnection conn = HBaseConnection.get(hbaseUrl); + org.apache.hadoop.hbase.client.Connection conn = HBaseConnection.get(hbaseUrl); HBaseConnection.createHTableIfNeeded(conn, userTableName, USER_QUERY_FAMILY); - htable = conn.getTable(userTableName); -======= htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); ->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/QueryService.java Get get = new Get(Bytes.toBytes(creator)); get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); Result result = htable.get(get); http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java index 64c2c7d..e039534 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -146,17 +146,12 @@ public class UserService implements UserDetailsManager { public void updateUser(UserDetails user) { Table htable = null; try { - byte[] userAuthorities = serialize(user.getAuthorities()); htable = aclHBaseStorage.getTable(userTableName); -<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java Pair<byte[], byte[]> pair = userToHBaseRow(user); Put put = new Put(pair.getKey()); - put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond()); -======= - Put put = new Put(Bytes.toBytes(user.getUsername())); - put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities); ->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java + + put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), pair.getSecond()); htable.put(put); } catch (IOException e) { @@ -219,13 +214,8 @@ public class UserService implements UserDetailsManager { Scan s = new Scan(); s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN)); -<<<<<<< b6296775f7793a63baf7f6a97cf4c0759d654341:server-base/src/main/java/org/apache/kylin/rest/service/UserService.java List<UserDetails> all = new ArrayList<UserDetails>(); - HTableInterface htable = null; -======= - List<String> authorities = new ArrayList<String>(); Table htable = null; ->>>>>>> KYLIN-1528 Create a branch for v1.5 with HBase 1.x API:server/src/main/java/org/apache/kylin/rest/service/UserService.java ResultScanner scanner = null; try { htable = aclHBaseStorage.getTable(userTableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 05170a0..e7ee2f5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -21,7 +21,7 @@ package org.apache.kylin.storage.hbase; import java.io.IOException; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Threads; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.engine.mr.HadoopUtil; @@ -51,14 +52,20 @@ public class HBaseConnection { private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); - private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>(); - private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>(); + private static final Map<String, Configuration> configCache = new ConcurrentHashMap<String, Configuration>(); + private static final Map<String, Connection> connPool = new ConcurrentHashMap<String, Connection>(); + + private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>(); + + private static ExecutorService coprocessorPool = null; static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (Connection conn : ConnPool.values()) { + closeCoprocessorPool(); + + for (Connection conn : connPool.values()) { try { conn.close(); } catch (IOException e) { @@ -68,19 +75,62 @@ public class HBaseConnection { } }); } + + public static ExecutorService getCoprocessorPool() { + if (coprocessorPool != null) { + return coprocessorPool; + } + + synchronized (HBaseConnection.class) { + if (coprocessorPool != null) { + return coprocessorPool; + } + + KylinConfig config = KylinConfig.getInstanceFromEnv(); + + // copy from HConnectionImplementation.getBatchPool() + int maxThreads = config.getHBaseMaxConnectionThreads(); + int coreThreads = config.getHBaseCoreConnectionThreads(); + long keepAliveTime = config.getHBaseConnectionThreadPoolAliveSeconds(); + LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * 100); + ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, // + Threads.newDaemonThreadFactory("kylin-coproc-")); + tpe.allowCoreThreadTimeOut(true); + + logger.info("Creating coprocessor thread pool with max of {}, core of {}", maxThreads, coreThreads); + + coprocessorPool = tpe; + return coprocessorPool; + } + } + + private static void closeCoprocessorPool() { + if (coprocessorPool == null) + return; + + coprocessorPool.shutdown(); + try { + if (!coprocessorPool.awaitTermination(10, TimeUnit.SECONDS)) { + coprocessorPool.shutdownNow(); + } + } catch (InterruptedException e) { + coprocessorPool.shutdownNow(); + } + } + public static void clearConnCache() { - ConnPool.clear(); + connPool.clear(); } private static final ThreadLocal<Configuration> hbaseConfig = new ThreadLocal<>(); public static Configuration getCurrentHBaseConfiguration() { - if (hbaseConfig.get() == null) { + if (configThreadLocal.get() == null) { String storageUrl = KylinConfig.getInstanceFromEnv().getStorageUrl(); - hbaseConfig.set(newHBaseConfiguration(storageUrl)); + configThreadLocal.set(newHBaseConfiguration(storageUrl)); } - return hbaseConfig.get(); + return configThreadLocal.get(); } private static Configuration newHBaseConfiguration(String url) { @@ -128,20 +178,20 @@ public class HBaseConnection { @SuppressWarnings("resource") public static Connection get(String url) { // find configuration - Configuration conf = ConfigCache.get(url); + Configuration conf = configCache.get(url); if (conf == null) { conf = newHBaseConfiguration(url); - ConfigCache.put(url, conf); + configCache.put(url, conf); } - Connection connection = ConnPool.get(url); + Connection connection = connPool.get(url); try { while (true) { // I don't use DCL since recreate a connection is not a big issue. if (connection == null || connection.isClosed()) { logger.info("connection is null or closed, creating a new one"); connection = ConnectionFactory.createConnection(conf); - ConnPool.put(url, connection); + connPool.put(url, connection); } if (connection == null || connection.isClosed()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index f988dea..aa7a4d4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -32,14 +32,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -286,7 +281,6 @@ public class HBaseResourceStore extends ResourceStore { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); - table.flushCommits(); if (hdfsResourceExist) { // remove hdfs cell value Path redirectPath = bigCellHDFSPath(resPath); @@ -308,7 +302,7 @@ public class HBaseResourceStore extends ResourceStore { } private Result getFromHTable(String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { return internalGetFromHTable(table, path, fetchContent, fetchTimestamp); } finally { @@ -316,7 +310,7 @@ public class HBaseResourceStore extends ResourceStore { } } - private Result internalGetFromHTable(HTableInterface table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { + private Result internalGetFromHTable(Table table, String path, boolean fetchContent, boolean fetchTimestamp) throws IOException { byte[] rowkey = Bytes.toBytes(path); Get get = new Get(rowkey); @@ -330,14 +324,9 @@ public class HBaseResourceStore extends ResourceStore { get.addColumn(B_FAMILY, B_COLUMN_TS); } - Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); - try { - Result result = table.get(get); - boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); - return exists ? result : null; - } finally { - IOUtils.closeQuietly(table); - } + Result result = table.get(get); + boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); + return exists ? result : null; } private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 830aca7..d84074f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -360,7 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final boolean[] abnormalFinish = new boolean[1]; try { - HTableInterface table = conn.get(cubeSeg.getStorageLocationIdentifier(), HBaseConnection.getCoprocessorPool()); + Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); final CubeVisitRequest request = builder.build(); final byte[] startKey = epRange.getFirst(); http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index 16955dd..c990379 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -136,7 +136,6 @@ public class HBaseCuboidWriter implements ICuboidWriter { logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); } - logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 6b63e66..dfb7c78 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -27,16 +27,8 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; @@ -45,7 +37,6 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -463,11 +454,7 @@ public class CubeMigrationCLI { put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); destAclHtable.put(put); } - Put put = new Put(Bytes.toBytes(cubeId)); - put.add(family, column, value); - destAclHtable.put(put); } - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(srcAclHtable); IOUtils.closeQuietly(destAclHtable); @@ -533,13 +520,12 @@ public class CubeMigrationCLI { case COPY_ACL: { String cubeId = (String) opt.params[0]; String modelId = (String) opt.params[1]; - HTableInterface destAclHtable = null; + Table destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(destAclHtable); } http://git-wip-us.apache.org/repos/asf/kylin/blob/15c1d3c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index d7f49df..874121d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.CliCommandExecutor; @@ -167,10 +168,10 @@ public class StorageCleanupJob extends AbstractHadoopJob { } class DeleteHTableRunnable implements Callable { - HBaseAdmin hbaseAdmin; + Admin hbaseAdmin; String htableName; - DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) { + DeleteHTableRunnable(Admin hbaseAdmin, String htableName) { this.hbaseAdmin = hbaseAdmin; this.htableName = htableName; }