Repository: kylin
Updated Branches:
  refs/heads/2.x-staging f1cd86e5a -> e5653278c


KYLIN-1350 - hbase Result.binarySearch is found to be problematic in concurrent 
environments


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5653278
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5653278
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5653278

Branch: refs/heads/2.x-staging
Commit: e5653278cb1236aa93e11dd8b68356d04e2e277b
Parents: f1cd86e
Author: honma <ho...@ebay.com>
Authored: Wed Jan 20 15:15:54 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Jan 20 17:53:36 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v1/CubeSegmentTupleIterator.java |  1 +
 .../cube/v2/SequentialCubeTupleIterator.java    |  2 +
 .../storage/hbase/steps/RowValueDecoder.java    | 14 ++++--
 .../kylin/storage/hbase/util/Results.java       | 49 ++++++++++++++++++++
 4 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index f8edee2..1232cb2 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -307,6 +307,7 @@ public class CubeSegmentTupleIterator implements 
ITupleIterator {
                 logger.debug("HBase Metrics when scanning " + this.tableName + 
" count={}, ms={}, bytes={}, remote_bytes={}, regions={}, 
not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, 
remote_rpc_retries={}", //
                         new Object[] { scanCount, 
scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, 
scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, 
scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, 
scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, 
scanMetrics.countOfRemoteRPCRetries });
             }
+            scan = null;
         }
         try {
             if (scanner != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 77509e9..f0cebfe 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -144,6 +144,8 @@ public class SequentialCubeTupleIterator implements 
ITupleIterator {
 
     @Override
     public void close() {
+        // hasNext() loop may exit because of limit, threshold, etc.
+        // close all the remaining segmentIterator
         flushScanCountDelta();
 
         if (curScanner != null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 59a5fed..7d05dc9 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -30,11 +30,16 @@ import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.storage.hbase.util.Results;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
 public class RowValueDecoder implements Cloneable {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(RowValueDecoder.class);
+
     private final HBaseColumnDesc hbaseColumn;
     private final byte[] hbaseColumnFamily;
     private final byte[] hbaseColumnQualifier;
@@ -57,13 +62,14 @@ public class RowValueDecoder implements Cloneable {
     public void decodeAndConvertJavaObj(Result hbaseRow) {
         decode(hbaseRow, true);
     }
-    
+
     public void decode(Result hbaseRow) {
         decode(hbaseRow, false);
     }
 
     private void decode(Result hbaseRow, boolean convertToJavaObject) {
-        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, 
hbaseColumnQualifier), convertToJavaObject);
+        ByteBuffer buffer = Results.getValueAsByteBuffer(hbaseRow, 
hbaseColumnFamily, hbaseColumnQualifier);
+        decode(buffer, convertToJavaObject);
     }
 
     public void decodeAndConvertJavaObj(byte[] bytes) {
@@ -73,7 +79,7 @@ public class RowValueDecoder implements Cloneable {
     public void decode(byte[] bytes) {
         decode(ByteBuffer.wrap(bytes), false);
     }
-    
+
     private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
         codec.decode(buffer, values);
         if (convertToJavaObject) {
@@ -113,7 +119,7 @@ public class RowValueDecoder implements Cloneable {
     public MeasureDesc[] getMeasures() {
         return measures;
     }
-    
+
     // result is in order of <code>CubeDesc.getMeasures()</code>
     public void loadCubeMeasureArray(Object result[]) {
         int[] measureIndex = hbaseColumn.getMeasureIndex();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java
new file mode 100644
index 0000000..f619007
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ * The helper class is introduced because {@link Result#binarySearch(Cell[], 
byte[], int, int, byte[], int, int)} 
+ * is found to be problematic in concurrent environments, and unfortunately  
{@link Result#getValueAsByteBuffer(byte[], byte[])} 
+ * calls it.
+ */
+public class Results {
+    public static ByteBuffer getValueAsByteBuffer(Result hbaseRow, byte[] cf, 
byte[] cq) {
+        List<Cell> cells = hbaseRow.listCells();
+        if (cells == null || cells.size() == 0) {
+            return null;
+        } else {
+            for (Cell c : cells) {
+                if (Bytes.compareTo(cf, 0, cf.length, c.getFamilyArray(), 
c.getFamilyOffset(), c.getFamilyLength()) == 0 && //
+                        Bytes.compareTo(cq, 0, cq.length, 
c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()) == 0) {
+                    return ByteBuffer.wrap(c.getValueArray(), 
c.getValueOffset(), c.getValueLength());
+                }
+            }
+        }
+        return null;
+    }
+
+}

Reply via email to