Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 406a02a15 -> 35a5d87af


KYLIN-1205 Patch the scan in HBaseResourceStore as well


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/35a5d87a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/35a5d87a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/35a5d87a

Branch: refs/heads/2.x-staging
Commit: 35a5d87af4098f49bc8caced2ad0ee5d98e6d5ad
Parents: 406a02a
Author: Yang Li <liy...@apache.org>
Authored: Fri Dec 25 20:50:19 2015 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Fri Dec 25 20:50:26 2015 +0800

----------------------------------------------------------------------
 .../kylin/storage/hbase/HBaseResourceStore.java | 101 +++++++++----------
 1 file changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/35a5d87a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 72192a5..6777d88 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -40,10 +40,10 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -159,6 +159,7 @@ public class HBaseResourceStore extends ResourceStore {
         Scan scan = new Scan(startRow, endRow);
         scan.addColumn(B_FAMILY, B_COLUMN_TS);
         scan.addColumn(B_FAMILY, B_COLUMN);
+        tuneScanParameters(scan);
 
         HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
         List<RawResource> result = Lists.newArrayList();
@@ -178,53 +179,51 @@ public class HBaseResourceStore extends ResourceStore {
         return result;
     }
 
-    @Override
-    protected List<RawResource> getAllResources(String rangeStart, String 
rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
-        byte[] startRow = Bytes.toBytes(rangeStart);
-        byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
-
-        Scan scan = new Scan(startRow, endRow);
-        scan.addColumn(B_FAMILY, B_COLUMN_TS);
-        scan.addColumn(B_FAMILY, B_COLUMN);
-        scan.setFilter(generateTimeFilterList(timeStartInMillis, 
timeEndInMillis));
-
-        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
-        List<RawResource> result = Lists.newArrayList();
-        try {
-            ResultScanner scanner = table.getScanner(scan);
-            for (Result r : scanner) {
-                result.add(new 
RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
-            }
-        } catch (IOException e) {
-            for (RawResource rawResource : result) {
-                IOUtils.closeQuietly(rawResource.inputStream);
-            }
-            throw e;
-        } finally {
-            IOUtils.closeQuietly(table);
-        }
-        return result;
-    }
-
-    private FilterList generateTimeFilterList(long timeStartInMillis, long 
timeEndInMillis) {
-        FilterList filterList = new 
FilterList(FilterList.Operator.MUST_PASS_ALL);
-        SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter(
-                B_FAMILY,
-                B_COLUMN_TS,
-                CompareFilter.CompareOp.GREATER,
-                Bytes.toBytes(timeStartInMillis)
-        );
-        filterList.addFilter(timeStartFilter);
-        SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter(
-                B_FAMILY,
-                B_COLUMN_TS,
-                CompareFilter.CompareOp.LESS_OR_EQUAL,
-                Bytes.toBytes(timeEndInMillis)
-        );
-        filterList.addFilter(timeEndFilter);
-        return filterList;
-    }
-
+    @Override
+    protected List<RawResource> getAllResources(String rangeStart, String 
rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException {
+        byte[] startRow = Bytes.toBytes(rangeStart);
+        byte[] endRow = plusZero(Bytes.toBytes(rangeEnd));
+
+        Scan scan = new Scan(startRow, endRow);
+        scan.addColumn(B_FAMILY, B_COLUMN_TS);
+        scan.addColumn(B_FAMILY, B_COLUMN);
+        scan.setFilter(generateTimeFilterList(timeStartInMillis, 
timeEndInMillis));
+        tuneScanParameters(scan);
+
+        HTableInterface table = 
getConnection().getTable(getAllInOneTableName());
+        List<RawResource> result = Lists.newArrayList();
+        try {
+            ResultScanner scanner = table.getScanner(scan);
+            for (Result r : scanner) {
+                result.add(new 
RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r)));
+            }
+        } catch (IOException e) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw e;
+        } finally {
+            IOUtils.closeQuietly(table);
+        }
+        return result;
+    }
+
+    private void tuneScanParameters(Scan scan) {
+        // divide by 10 as some resource like dictionary or snapshot can be 
very large
+        scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
+        scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
+        scan.setCacheBlocks(true);
+    }
+
+    private FilterList generateTimeFilterList(long timeStartInMillis, long 
timeEndInMillis) {
+        FilterList filterList = new 
FilterList(FilterList.Operator.MUST_PASS_ALL);
+        SingleColumnValueFilter timeStartFilter = new 
SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, CompareFilter.CompareOp.GREATER, 
Bytes.toBytes(timeStartInMillis));
+        filterList.addFilter(timeStartFilter);
+        SingleColumnValueFilter timeEndFilter = new 
SingleColumnValueFilter(B_FAMILY, B_COLUMN_TS, 
CompareFilter.CompareOp.LESS_OR_EQUAL, Bytes.toBytes(timeEndInMillis));
+        filterList.addFilter(timeEndFilter);
+        return filterList;
+    }
+
     private InputStream getInputStream(String resPath, Result r) throws 
IOException {
         if (r == null) {
             return null;
@@ -262,7 +261,7 @@ public class HBaseResourceStore extends ResourceStore {
     protected long getResourceTimestampImpl(String resPath) throws IOException 
{
         return getTimestamp(getByScan(resPath, false, true));
     }
-    
+
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long 
ts) throws IOException {
         ByteArrayOutputStream bout = new ByteArrayOutputStream();

Reply via email to