Repository: kylin Updated Branches: refs/heads/2.x-staging 406a02a15 -> 35a5d87af
KYLIN-1205 Patch the scan in HBaseResourceStore as well Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/35a5d87a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/35a5d87a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/35a5d87a Branch: refs/heads/2.x-staging Commit: 35a5d87af4098f49bc8caced2ad0ee5d98e6d5ad Parents: 406a02a Author: Yang Li <liy...@apache.org> Authored: Fri Dec 25 20:50:19 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Fri Dec 25 20:50:26 2015 +0800 ---------------------------------------------------------------------- .../kylin/storage/hbase/HBaseResourceStore.java | 101 +++++++++---------- 1 file changed, 50 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/35a5d87a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 72192a5..6777d88 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -40,10 +40,10 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -159,6 +159,7 @@ public class HBaseResourceStore extends ResourceStore { Scan scan = new Scan(startRow, endRow); scan.addColumn(B_FAMILY, B_COLUMN_TS); scan.addColumn(B_FAMILY, B_COLUMN); + tuneScanParameters(scan); HTableInterface table = getConnection().getTable(getAllInOneTableName()); List<RawResource> result = Lists.newArrayList(); @@ -178,53 +179,51 @@ public class HBaseResourceStore extends ResourceStore { return result; } - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.inputStream); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { - FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); - SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter( - B_FAMILY, - B_COLUMN_TS, - CompareFilter.CompareOp.GREATER, - Bytes.toBytes(timeStartInMillis) - ); - filterList.addFilter(timeStartFilter); - SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter( - B_FAMILY, - B_COLUMN_TS, - CompareFilter.CompareOp.LESS_OR_EQUAL, - Bytes.toBytes(timeEndInMillis) - ); - filterList.addFilter(timeEndFilter); - return filterList; - } - + @Override + protected List<RawResource> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { + byte[] startRow = Bytes.toBytes(rangeStart); + byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); + + Scan scan = new Scan(startRow, endRow); + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); + tuneScanParameters(scan); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + List<RawResource> result = Lists.newArrayList(); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); + } + } catch (IOException e) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.inputStream); + } + throw e; + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + private void tuneScanParameters(Scan scan) { + // divide by 10 as some resource like dictionary or snapshot can be very large + scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10); + scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize()); + scan.setCacheBlocks(true); + } + + private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, Bytes.toBytes(timeStartInMillis)); + filterList.addFilter(timeStartFilter); + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis)); + filterList.addFilter(timeEndFilter); + return filterList; + } + private InputStream getInputStream(String resPath, Result r) throws IOException { if (r == null) { return null; @@ -262,7 +261,7 @@ public class HBaseResourceStore extends ResourceStore { protected long getResourceTimestampImpl(String resPath) throws IOException { return getTimestamp(getByScan(resPath, false, true)); } - + @Override protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { ByteArrayOutputStream bout = new ByteArrayOutputStream();