Repository: kylin Updated Branches: refs/heads/2.x-staging f1cd86e5a -> e5653278c
KYLIN-1350 - hbase Result.binarySearch is found to be problematic in concurrent environments Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5653278 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5653278 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5653278 Branch: refs/heads/2.x-staging Commit: e5653278cb1236aa93e11dd8b68356d04e2e277b Parents: f1cd86e Author: honma <ho...@ebay.com> Authored: Wed Jan 20 15:15:54 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Wed Jan 20 17:53:36 2016 +0800 ---------------------------------------------------------------------- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 1 + .../cube/v2/SequentialCubeTupleIterator.java | 2 + .../storage/hbase/steps/RowValueDecoder.java | 14 ++++-- .../kylin/storage/hbase/util/Results.java | 49 ++++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index f8edee2..1232cb2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -307,6 +307,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { logger.debug("HBase Metrics when scanning " + this.tableName + " count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", // new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries }); } + scan = null; } try { if (scanner != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java index 77509e9..f0cebfe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java @@ -144,6 +144,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public void close() { + // hasNext() loop may exit because of limit, threshold, etc. + // close all the remaining segmentIterator flushScanCountDelta(); if (curScanner != null) http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java index 59a5fed..7d05dc9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java @@ -30,11 +30,16 @@ import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.storage.hbase.util.Results; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ public class RowValueDecoder implements Cloneable { + private static final Logger logger = LoggerFactory.getLogger(RowValueDecoder.class); + private final HBaseColumnDesc hbaseColumn; private final byte[] hbaseColumnFamily; private final byte[] hbaseColumnQualifier; @@ -57,13 +62,14 @@ public class RowValueDecoder implements Cloneable { public void decodeAndConvertJavaObj(Result hbaseRow) { decode(hbaseRow, true); } - + public void decode(Result hbaseRow) { decode(hbaseRow, false); } private void decode(Result hbaseRow, boolean convertToJavaObject) { - decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject); + ByteBuffer buffer = Results.getValueAsByteBuffer(hbaseRow, hbaseColumnFamily, hbaseColumnQualifier); + decode(buffer, convertToJavaObject); } public void decodeAndConvertJavaObj(byte[] bytes) { @@ -73,7 +79,7 @@ public class RowValueDecoder implements Cloneable { public void decode(byte[] bytes) { decode(ByteBuffer.wrap(bytes), false); } - + private void decode(ByteBuffer buffer, boolean convertToJavaObject) { codec.decode(buffer, values); if (convertToJavaObject) { @@ -113,7 +119,7 @@ public class RowValueDecoder implements Cloneable { public MeasureDesc[] getMeasures() { return measures; } - + // result is in order of <code>CubeDesc.getMeasures()</code> public void loadCubeMeasureArray(Object result[]) { int[] measureIndex = hbaseColumn.getMeasureIndex(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5653278/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java new file mode 100644 index 0000000..f619007 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/Results.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.util; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.kylin.common.util.Bytes; + +/** + * The helper class is introduced because {@link Result#binarySearch(Cell[], byte[], int, int, byte[], int, int)} + * is found to be problematic in concurrent environments, and unfortunately {@link Result#getValueAsByteBuffer(byte[], byte[])} + * calls it. + */ +public class Results { + public static ByteBuffer getValueAsByteBuffer(Result hbaseRow, byte[] cf, byte[] cq) { + List<Cell> cells = hbaseRow.listCells(); + if (cells == null || cells.size() == 0) { + return null; + } else { + for (Cell c : cells) { + if (Bytes.compareTo(cf, 0, cf.length, c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength()) == 0 && // + Bytes.compareTo(cq, 0, cq.length, c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()) == 0) { + return ByteBuffer.wrap(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + } + } + return null; + } + +}