http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/monitor/pom.xml ---------------------------------------------------------------------- diff --git a/monitor/pom.xml b/monitor/pom.xml index 820934f..399535d 100644 --- a/monitor/pom.xml +++ b/monitor/pom.xml @@ -39,6 +39,12 @@ <dependencies> <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope>
http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java ---------------------------------------------------------------------- diff --git a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java index 97200fc..94b3937 100644 --- a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java +++ b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java @@ -20,18 +20,21 @@ package org.apache.kylin.monitor; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.log4j.Logger; /** @@ -122,11 +125,10 @@ public class MonitorMetaManager { public static String getListWithRowkey(String table, String rowkey) throws IOException { Result result = getResultByRowKey(table, rowkey); String fileList = null; - if (result.list() != null) { - for (KeyValue kv : result.list()) { - fileList = Bytes.toString(kv.getValue()); + if (result.listCells() != null) { + for (Cell cell : result.listCells()) { + fileList = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset()); } - } fileList = fileList == null ? "" : fileList; return fileList; @@ -164,16 +166,20 @@ public class MonitorMetaManager { * create table in hbase */ public static void creatTable(String tableName, String[] family) throws Exception { - HBaseAdmin admin = new HBaseAdmin(conf); - HTableDescriptor desc = new HTableDescriptor(tableName); - for (int i = 0; i < family.length; i++) { - desc.addFamily(new HColumnDescriptor(family[i])); - } - if (admin.tableExists(tableName)) { - logger.info("table Exists!"); - } else { - admin.createTable(desc); - logger.info("create table Success!"); + Admin admin = HBaseConnection.get().getAdmin(); + try { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (int i = 0; i < family.length; i++) { + desc.addFamily(new HColumnDescriptor(family[i])); + } + if (admin.tableExists(TableName.valueOf(tableName))) { + logger.info("table Exists!"); + } else { + admin.createTable(desc); + logger.info("create table Success!"); + } + } finally { + IOUtils.closeQuietly(admin); } } @@ -181,13 +187,15 @@ public class MonitorMetaManager { * update cell in hbase */ public static void updateData(String tableName, String rowKey, String family, String column, String value) throws IOException { - HTable table = new HTable(conf, Bytes.toBytes(tableName)); + Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName)); Put put = new Put(rowKey.getBytes()); - put.add(family.getBytes(), column.getBytes(), value.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), value.getBytes()); try { table.put(put); } catch (IOException e) { e.printStackTrace(); + } finally { + IOUtils.closeQuietly(table); } logger.info("update table [" + tableName + "]"); logger.info("rowKey [" + rowKey + "]"); @@ -200,9 +208,10 @@ public class MonitorMetaManager { * get result by rowkey */ public static Result getResultByRowKey(String tableName, String rowKey) throws IOException { - HTable table = new HTable(conf, Bytes.toBytes(tableName)); + Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); + IOUtils.closeQuietly(table); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7cb5847..eaa23b1 100644 --- a/pom.xml +++ b/pom.xml @@ -45,12 +45,13 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- Hadoop versions --> - <hadoop2.version>2.6.0</hadoop2.version> - <yarn.version>2.6.0</yarn.version> + <hadoop2.version>2.7.1</hadoop2.version> + <yarn.version>2.7.1</yarn.version> <zookeeper.version>3.4.6</zookeeper.version> - <hive.version>0.14.0</hive.version> - <hive-hcatalog.version>0.14.0</hive-hcatalog.version> - <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> + <hive.version>1.2.1</hive.version> + <hive-hcatalog.version>1.2.1</hive-hcatalog.version> + <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version> + <curator.version>2.7.1</curator.version> <!-- Dependency versions --> <antlr.version>3.4</antlr.version> @@ -90,9 +91,6 @@ <!-- Calcite Version --> <calcite.version>1.4.0-incubating</calcite.version> - <!-- Curator.version Version --> - <curator.version>2.6.0</curator.version> - <!-- Sonar --> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> <sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis> http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/server/src/main/java/org/apache/kylin/rest/service/AclService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java index ea2a48e..8a1cf6d 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java @@ -29,13 +29,14 @@ import java.util.Map; import java.util.NavigableMap; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; @@ -130,9 +131,9 @@ public class AclService implements MutableAclService { @Override public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) { List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Scan scan = new Scan(); SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity))); @@ -179,10 +180,10 @@ public class AclService implements MutableAclService { @Override public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException { Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>(); - HTableInterface htable = null; + Table htable = null; Result result = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); for (ObjectIdentity oid : oids) { result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier())))); @@ -231,16 +232,15 @@ public class AclService implements MutableAclService { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); PrincipalSid sid = new PrincipalSid(auth); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " created successfully."); } catch (IOException e) { @@ -254,9 +254,9 @@ public class AclService implements MutableAclService { @Override public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); List<ObjectIdentity> children = findChildren(objectIdentity); @@ -269,7 +269,6 @@ public class AclService implements MutableAclService { } htable.delete(delete); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " deleted successfully."); } catch (IOException e) { @@ -287,27 +286,26 @@ public class AclService implements MutableAclService { throw e; } - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); - delete.deleteFamily(Bytes.toBytes(ACL_ACES_FAMILY)); + delete.addFamily(Bytes.toBytes(ACL_ACES_FAMILY)); htable.delete(delete); Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); if (null != acl.getParentAcl()) { - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); } for (AccessControlEntry ace : acl.getEntries()) { AceInfo aceInfo = new AceInfo(ace); - put.add(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); + put.addColumn(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); } if (!put.isEmpty()) { htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully."); } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/server/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index b8de4d4..37fb770 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,9 +29,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HBaseRegionSizeCalculator; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; @@ -401,33 +401,24 @@ public class CubeService extends BasicService { * @throws IOException Exception when HTable resource is not closed correctly. */ public HBaseResponse getHTableInfo(String tableName) throws IOException { - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - HTable table = null; + Connection conn = HBaseConnection.get(); HBaseResponse hr = null; long tableSize = 0; int regionCount = 0; - try { - table = new HTable(hconf, tableName); - - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); - Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); + Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); - for (long s : sizeMap.values()) { - tableSize += s; - } + for (long s : sizeMap.values()) { + tableSize += s; + } - regionCount = sizeMap.size(); + regionCount = sizeMap.size(); - // Set response. - hr = new HBaseResponse(); - hr.setTableSize(tableSize); - hr.setRegionCount(regionCount); - } finally { - if (null != table) { - table.close(); - } - } + // Set response. + hr = new HBaseResponse(); + hr.setTableSize(tableSize); + hr.setRegionCount(regionCount); return hr; } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/server/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 764df4b..7d14021 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -42,10 +42,11 @@ import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.persistence.HBaseConnection; @@ -124,14 +125,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -157,14 +157,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -176,9 +175,9 @@ public class QueryService extends BasicService { } List<Query> queries = new ArrayList<Query>(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(creator)); get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); Result result = htable.get(get); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/server/src/main/java/org/apache/kylin/rest/service/UserService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java index d665ab9..d03cd55 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -25,13 +25,14 @@ import java.util.Collection; import java.util.List; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; @@ -75,9 +76,9 @@ public class UserService implements UserManager { @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(username)); get.addFamily(Bytes.toBytes(USER_AUTHORITY_FAMILY)); @@ -106,15 +107,14 @@ public class UserService implements UserManager { @Override public void updateUser(UserDetails user) { - HTableInterface htable = null; + Table htable = null; try { byte[] userAuthorities = serialize(user.getAuthorities()); - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(user.getUsername())); - put.add(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities); + put.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities); htable.put(put); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -124,13 +124,12 @@ public class UserService implements UserManager { @Override public void deleteUser(String username) { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Delete delete = new Delete(Bytes.toBytes(username)); htable.delete(delete); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -145,9 +144,9 @@ public class UserService implements UserManager { @Override public boolean userExists(String username) { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Result result = htable.get(new Get(Bytes.toBytes(username))); return null != result && !result.isEmpty(); @@ -164,10 +163,10 @@ public class UserService implements UserManager { s.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN)); List<String> authorities = new ArrayList<String>(); - HTableInterface htable = null; + Table htable = null; ResultScanner scanner = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); scanner = htable.getScanner(s); for (Result result = scanner.next(); result != null; result = scanner.next()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java index f529145..4aeb676 100644 --- a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java +++ b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java @@ -25,7 +25,6 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; - import org.roaringbitmap.RoaringBitmap; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java index 9627efb..e4329f2 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java @@ -28,16 +28,16 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.common.util.Array; @@ -84,7 +84,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private final List<RowValueDecoder> rowValueDecoders; private final StorageContext context; private final String tableName; - private final HTableInterface table; + private final Table table; private final RowKeyDecoder rowKeyDecoder; private final Iterator<HBaseKeyRange> rangeIterator; @@ -105,7 +105,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private int advMeasureRowsRemaining; private int advMeasureRowIndex; - public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { this.cube = cubeSeg.getCubeInstance(); this.cubeSeg = cubeSeg; this.dimensions = dimensions; @@ -127,7 +127,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { try { - this.table = conn.getTable(tableName); + this.table = conn.getTable(TableName.valueOf(tableName)); } catch (Throwable t) { throw new StorageException("Error when open connection to table " + tableName, t); } @@ -144,9 +144,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator { if (logger.isDebugEnabled() && scan != null) { logger.debug("Scan " + scan.toString()); - byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); - if (metricsBytes != null) { - ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes); + ScanMetrics scanMetrics = scan.getScanMetrics(); + if (scanMetrics != null) { logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries }); } } @@ -303,7 +302,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private Scan buildScan(HBaseKeyRange keyRange) { Scan scan = new Scan(); tuneScanParameters(scan); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + scan.setScanMetricsEnabled(true); for (RowValueDecoder valueDecoder : this.rowValueDecoders) { HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn(); byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java index 626b784..7510dcd 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Dictionary; @@ -142,7 +142,7 @@ public class CubeStorageEngine implements IStorageEngine { setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); - HConnection conn = HBaseConnection.get(context.getConnUrl()); + Connection conn = HBaseConnection.get(context.getConnUrl()); return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context); } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java index 918fd4b..6a76baa 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java @@ -1,93 +1,94 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.invertedindex.model.IIDesc; - -/** - * @author yangli9 - * - */ -public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable { - - byte[] family; - byte[] qualifier; - - HTableInterface table; - ResultScanner scanner; - Iterator<Result> iterator; - - public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException { - this.family = family; - this.qualifier = qualifier; - - this.table = hconn.getTable(tableName); - this.scanner = table.getScanner(family, qualifier); - this.iterator = scanner.iterator(); - } - - @Override - public void close() { - IOUtils.closeQuietly(scanner); - IOUtils.closeQuietly(table); - } - - @Override - public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() { - return new MyIterator(); - } - - private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> { - - ImmutableBytesWritable key = new ImmutableBytesWritable(); - ImmutableBytesWritable value = new ImmutableBytesWritable(); - Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value); - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() { - Result r = iterator.next(); - Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); - key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength()); - value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - return pair; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.invertedindex.model.IIDesc; + +/** + * @author yangli9 + * + */ +public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable { + + byte[] family; + byte[] qualifier; + + Table table; + ResultScanner scanner; + Iterator<Result> iterator; + + public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException { + this.family = family; + this.qualifier = qualifier; + + this.table = hconn.getTable(TableName.valueOf(tableName)); + this.scanner = table.getScanner(family, qualifier); + this.iterator = scanner.iterator(); + } + + @Override + public void close() { + IOUtils.closeQuietly(scanner); + IOUtils.closeQuietly(table); + } + + @Override + public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() { + return new MyIterator(); + } + + private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> { + + ImmutableBytesWritable key = new ImmutableBytesWritable(); + ImmutableBytesWritable value = new ImmutableBytesWritable(); + Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() { + Result r = iterator.next(); + Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); + key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength()); + value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + return pair; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java index afb49c0..e518a4c 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java @@ -1,57 +1,57 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase; - -import java.util.ArrayList; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.common.persistence.HBaseConnection; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.storage.IStorageEngine; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator; - -/** - * @author yangli9 - */ -public class InvertedIndexStorageEngine implements IStorageEngine { - - private IISegment seg; - - public InvertedIndexStorageEngine(IIInstance ii) { - this.seg = ii.getFirstSegment(); - } - - @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) { - String tableName = seg.getStorageLocationIdentifier(); - - //HConnection is cached, so need not be closed - HConnection conn = HBaseConnection.get(context.getConnUrl()); - try { - return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn); - } catch (Throwable e) { - e.printStackTrace(); - throw new IllegalStateException("Error when connecting to II htable " + tableName, e); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.client.Connection; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.tuple.ITupleIterator; +import org.apache.kylin.storage.IStorageEngine; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator; + +/** + * @author yangli9 + */ +public class InvertedIndexStorageEngine implements IStorageEngine { + + private IISegment seg; + + public InvertedIndexStorageEngine(IIInstance ii) { + this.seg = ii.getFirstSegment(); + } + + @Override + public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) { + String tableName = seg.getStorageLocationIdentifier(); + + // Connection is cached, so need not be closed + Connection conn = HBaseConnection.get(context.getConnUrl()); + try { + return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn); + } catch (Throwable e) { + e.printStackTrace(); + throw new IllegalStateException("Error when connecting to II htable " + tableName, e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java index d4e8529..4a9c574 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java @@ -1,88 +1,91 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; - -/** - * @author yangli9 - * - */ -public class PingHBaseCLI { - - public static void main(String[] args) throws IOException { - String hbaseTable = args[0]; - - System.out.println("Hello friend."); - - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - if (User.isHBaseSecurityEnabled(hconf)) { - try { - System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); - } catch (InterruptedException e) { - System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - } - } - - Scan scan = new Scan(); - int limit = 20; - - HConnection conn = null; - HTableInterface table = null; - ResultScanner scanner = null; - try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); - scanner = table.getScanner(scan); - int count = 0; - for (Result r : scanner) { - byte[] rowkey = r.getRow(); - System.out.println(Bytes.toStringBinary(rowkey)); - count++; - if (count == limit) - break; - } - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - table.close(); - } - if (conn != null) { - conn.close(); - } - } - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.TokenUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; + +/** + * @author yangli9 + * + */ +public class PingHBaseCLI { + + public static void main(String[] args) throws IOException, InterruptedException { + String hbaseTable = args[0]; + + System.out.println("Hello friend."); + + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); + if (User.isHBaseSecurityEnabled(hconf)) { + Connection conn = ConnectionFactory.createConnection(hconf); + try { + UserProvider userProvider = UserProvider.instantiate(hconf); + TokenUtil.obtainAndCacheToken(conn, userProvider.create(UserGroupInformation.getCurrentUser())); + } finally { + conn.close(); + } + } + + Scan scan = new Scan(); + int limit = 20; + + Connection conn = null; + Table table = null; + ResultScanner scanner = null; + try { + conn = ConnectionFactory.createConnection(hconf); + table = conn.getTable(TableName.valueOf(hbaseTable)); + scanner = table.getScanner(scan); + int count = 0; + for (Result r : scanner) { + byte[] rowkey = r.getRow(); + System.out.println(Bytes.toStringBinary(rowkey)); + count++; + if (count == limit) + break; + } + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + if (conn != null) { + conn.close(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java index e2eeed0..a07cbe4 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; /** * @author yangli9 @@ -50,7 +51,7 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -60,11 +61,16 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @Override + public int getBatch() { + return -1; + } + + @Override public void close() throws IOException { scanner.close(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java index d188a44..bbe3397 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java @@ -24,7 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.kv.RowValueDecoder; @@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private ITupleIterator segmentIterator; private int scanCount; - public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { + public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) { this.context = context; int limit = context.getLimit(); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java index 8587075..450b1ae 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java @@ -26,8 +26,9 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -80,14 +81,14 @@ public class EndpointTupleIterator implements ITupleIterator { Iterator<List<IIProtos.IIResponse.IIRow>> regionResponsesIterator = null; ITupleIterator tupleIterator = null; - HTableInterface table = null; + Table table = null; int rowsInAllMetric = 0; - public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws Throwable { + public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn) throws Throwable { String tableName = segment.getStorageLocationIdentifier(); - table = conn.getTable(tableName); + table = conn.getTable(TableName.valueOf(tableName)); factTableName = segment.getIIDesc().getFactTableName(); if (rootFilter == null) { @@ -213,7 +214,7 @@ public class EndpointTupleIterator implements ITupleIterator { } //TODO : async callback - private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable { + private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, Table table) throws Throwable { Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() { public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException { ServerRpcController controller = new ServerRpcController(); @@ -236,7 +237,7 @@ public class EndpointTupleIterator implements ITupleIterator { int index = 0; for (int i = 0; i < columns.size(); i++) { - TblColRef column = columns.get(i); + // TblColRef column = columns.get(i); // if (!dimensions.contains(column)) { // continue; // } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java index a770f55..adf1bf1 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java @@ -90,7 +90,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop RegionScanner innerScanner = null; HRegion region = null; try { - region = env.getRegion(); + region = (HRegion) env.getRegion(); innerScanner = region.getScanner(buildScan()); region.startRegionOperation(); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java index 2cecd5c..c21ee36 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java @@ -93,7 +93,7 @@ public class AggregateRegionObserver extends BaseRegionObserver { // start/end region operation & sync on scanner is suggested by the // javadoc of RegionScanner.nextRaw() // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb - HRegion region = ctxt.getEnvironment().getRegion(); + HRegion region = (HRegion) ctxt.getEnvironment().getRegion(); region.startRegionOperation(); try { synchronized (innerScanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java index 8075bc3..eaa7d20 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter; @@ -93,13 +94,18 @@ public class AggregationScanner implements RegionScanner { } @Override + public int getBatch() { + return outerScanner.getBatch(); + } + + @Override public boolean next(List<Cell> results) throws IOException { return outerScanner.next(results); } @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return outerScanner.next(result, limit); + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return outerScanner.next(result, scannerContext); } @Override @@ -108,8 +114,8 @@ public class AggregationScanner implements RegionScanner { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return outerScanner.nextRaw(result, limit); + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + return outerScanner.nextRaw(result, scannerContext); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java index f5fb497..b1f642f 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.coprocessor.AggregationCache; @@ -107,7 +108,7 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -117,11 +118,16 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } @Override + public int getBatch() { + return innerScanner.getBatch(); + } + + @Override public void close() throws IOException { // AggregateRegionObserver.LOG.info("Kylin Scanner close()"); innerScanner.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java index 5278326..b941a5e 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java @@ -23,9 +23,9 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; @@ -58,7 +58,7 @@ public class ObserverEnabler { static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap(); public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // - Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { + Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException { if (context.isCoprocessorEnabled() == false) { return table.getScanner(scan); http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java ---------------------------------------------------------------------- diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java index f7fcef1..50069a1 100644 --- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java +++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java @@ -1,115 +1,112 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.index.RawTableRecord; -import org.apache.kylin.invertedindex.index.Slice; -import org.apache.kylin.invertedindex.index.TableRecord; -import org.apache.kylin.invertedindex.index.TableRecordInfo; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - */ -public class InvertedIndexHBaseTest extends HBaseMetadataTestCase { - - IIInstance ii; - IISegment seg; - HConnection hconn; - - TableRecordInfo info; - - @Before - public void setup() throws Exception { - this.createTestMetadata(); - - this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii"); - this.seg = ii.getFirstSegment(); - - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - hconn = HConnectionManager.createConnection(hconf); - - this.info = new TableRecordInfo(seg); - } - - @After - public void after() throws Exception { - this.cleanupTestMetadata(); - } - - @Test - public void testLoad() throws Exception { - - String tableName = seg.getStorageLocationIdentifier(); - IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest()); - - List<Slice> slices = Lists.newArrayList(); - HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); - try { - for (Slice slice : codec.decodeKeyValue(kvIterator)) { - slices.add(slice); - } - } finally { - kvIterator.close(); - } - - List<TableRecord> records = iterateRecords(slices); - dump(records); - System.out.println(records.size() + " records"); - } - - private List<TableRecord> iterateRecords(List<Slice> slices) { - List<TableRecord> records = Lists.newArrayList(); - for (Slice slice : slices) { - for (RawTableRecord rec : slice) { - records.add(new TableRecord((RawTableRecord) rec.clone(), info)); - } - } - return records; - } - - private void dump(Iterable<TableRecord> records) { - for (TableRecord rec : records) { - System.out.println(rec.toString()); - - byte[] x = rec.getBytes(); - String y = BytesUtil.toReadableText(x); - System.out.println(y); - System.out.println(); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase; + +import java.util.List; + +import org.apache.hadoop.hbase.client.Connection; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.RawTableRecord; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * @author yangli9 + */ +public class InvertedIndexHBaseTest extends HBaseMetadataTestCase { + + IIInstance ii; + IISegment seg; + Connection hconn; + + TableRecordInfo info; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + + this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii"); + this.seg = ii.getFirstSegment(); + + this.hconn = HBaseConnection.get(); + + this.info = new TableRecordInfo(seg); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testLoad() throws Exception { + + String tableName = seg.getStorageLocationIdentifier(); + IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest()); + + List<Slice> slices = Lists.newArrayList(); + HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); + try { + for (Slice slice : codec.decodeKeyValue(kvIterator)) { + slices.add(slice); + } + } finally { + kvIterator.close(); + } + + List<TableRecord> records = iterateRecords(slices); + dump(records); + System.out.println(records.size() + " records"); + } + + private List<TableRecord> iterateRecords(List<Slice> slices) { + List<TableRecord> records = Lists.newArrayList(); + for (Slice slice : slices) { + for (RawTableRecord rec : slice) { + records.add(new TableRecord((RawTableRecord) rec.clone(), info)); + } + } + return records; + } + + private void dump(Iterable<TableRecord> records) { + for (TableRecord rec : records) { + System.out.println(rec.toString()); + + byte[] x = rec.getBytes(); + String y = BytesUtil.toReadableText(x); + System.out.println(y); + System.out.println(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java ---------------------------------------------------------------------- diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java index 0454b4c..3ace91e 100644 --- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java +++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.io.LongWritable; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; @@ -223,102 +224,46 @@ public class AggregateRegionObserverTest { this.input = cellInputs; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util - * .List) - */ @Override public boolean next(List<Cell> results) throws IOException { return nextRaw(results); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util - * .List, int) - */ @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.hbase.regionserver.InternalScanner#close() - */ @Override public void close() throws IOException { } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() - */ @Override public HRegionInfo getRegionInfo() { return null; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() - */ @Override public boolean isFilterDone() throws IOException { return false; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) - */ @Override public boolean reseek(byte[] row) throws IOException { return false; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() - */ @Override public long getMaxResultSize() { return 0; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() - */ @Override public long getMvccReadPoint() { return 0; } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util - * .List) - */ @Override public boolean nextRaw(List<Cell> result) throws IOException { if (i < input.size()) { @@ -328,18 +273,15 @@ public class AggregateRegionObserverTest { return i < input.size(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util - * .List, int) - */ @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { return nextRaw(result); } + @Override + public int getBatch() { + return -1; + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/23768410/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java index d17cfa6..b1f6626 100644 --- a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java +++ b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java @@ -27,7 +27,6 @@ import java.sql.SQLException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.service.HiveInterface; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; @@ -47,7 +46,7 @@ public class HiveMiniClusterTest extends HiveJDBCClientTest { public static final File HIVE_WAREHOUSE_DIR = new File(HIVE_BASE_DIR + "/warehouse"); public static final File HIVE_TESTDATA_DIR = new File(HIVE_BASE_DIR + "/testdata"); public static final File HIVE_HADOOP_TMP_DIR = new File(HIVE_BASE_DIR + "/hadooptmp"); - protected HiveInterface client; + //protected HiveInterface client; protected MiniDFSCluster miniDFS; protected MiniMRCluster miniMR;