http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java index c58f227..2b2e490 100755 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/FilterDecorator.java @@ -60,14 +60,12 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator { return inevaluableColumns; } - private TupleFilter replaceConstantsWithLocalDict(CompareTupleFilter oldCompareFilter, - CompareTupleFilter newCompareFilter) { + private TupleFilter replaceConstantsWithLocalDict(CompareTupleFilter oldCompareFilter, CompareTupleFilter newCompareFilter) { //TODO localdict: (performance issue) transalte() with roundingflag 0 will use try catch exceptions to deal with non-existing entries return replaceConstantsWithGlobalDict(oldCompareFilter, newCompareFilter); } - private TupleFilter replaceConstantsWithGlobalDict(CompareTupleFilter oldCompareFilter, - CompareTupleFilter newCompareFilter) { + private TupleFilter replaceConstantsWithGlobalDict(CompareTupleFilter oldCompareFilter, CompareTupleFilter newCompareFilter) { Collection<String> constValues = (Collection<String>) oldCompareFilter.getValues(); String firstValue = constValues.iterator().next(); TblColRef col = newCompareFilter.getColumn(); @@ -219,4 +217,4 @@ public class FilterDecorator implements TupleFilterSerializer.Decorator { columnIO.writeColumn(column, v, roundingFlag, DimensionEncoding.NULL, id, 0); return Dictionary.dictIdToString(id, 0, id.length); } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java index 56f78dc..3eecba1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java @@ -109,8 +109,7 @@ public class HBaseScannerBenchmark { private void testScanRaw(String msg) throws IOException { long t = System.currentTimeMillis(); - IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setFilterPushDown(null).createGTScanRequest()); + IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); ResultScanner innerScanner = ((SimpleHBaseStore.Reader) scan).getHBaseScanner(); int count = 0; for (Result r : innerScanner) { @@ -126,8 +125,7 @@ public class HBaseScannerBenchmark { private void testScanRecords(String msg) throws IOException { long t = System.currentTimeMillis(); - IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setFilterPushDown(null).createGTScanRequest()); + IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest()); int count = 0; for (GTRecord rec : scan) { count++; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java index c38e8c8..b12173d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/SimpleHBaseStore.java @@ -170,16 +170,12 @@ public class SimpleHBaseStore implements IGTStore { private void loadRecord(Result r) { Cell[] cells = r.rawCells(); Cell cell = cells[0]; - if (Bytes.compareTo(CF_B, 0, CF_B.length, cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength()) != 0 // - || Bytes.compareTo(COL_B, 0, COL_B.length, cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength()) != 0) + if (Bytes.compareTo(CF_B, 0, CF_B.length, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) != 0 // + || Bytes.compareTo(COL_B, 0, COL_B.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) != 0) throw new IllegalStateException(); - rec.loadCellBlock(0, ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset() + ID_LEN, - cell.getRowLength() - ID_LEN)); - rec.loadCellBlock(1, - ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); + rec.loadCellBlock(0, ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset() + ID_LEN, cell.getRowLength() - ID_LEN)); + rec.loadCellBlock(1, ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 8586fac..e822ada 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -88,18 +88,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (shardNum == totalShards) { //all shards - return Lists.newArrayList( - Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1)))); + return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1)))); } else if (baseShard + shardNum <= totalShards) { //endpoint end key is inclusive, so no need to append 0 or anything - return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), - getByteArrayForShort((short) (baseShard + shardNum - 1)))); + return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1)))); } else { //0,1,2,3,4 wants 4,0 - return Lists.newArrayList( - Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), // - Pair.newPair(getByteArrayForShort((short) 0), - getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1)))); + return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), // + Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1)))); } } @@ -141,18 +137,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); - logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), - rawScanByteString.size()); + logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); - logger.info( - "The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", - Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); + logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); for (RawScan rs : rawScans) { logScan(rs, cubeSeg.getStorageLocationIdentifier()); } - logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, - cuboidBaseShard, rawScans.size()); + logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size()); // KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries // for different cubes until redeployment of coprocessor jar. @@ -177,13 +169,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public void run() { - final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, - Integer.toHexString(System.identityHashCode(scanRequest))); + final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>(); try { - Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), - HBaseConnection.getCoprocessorPool()); + Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); final CubeVisitRequest request = builder.build(); final byte[] startKey = epRange.getFirst(); @@ -225,20 +215,15 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return; } - if (queryContext.getScannedBytes() > cubeSeg.getConfig() - .getQueryMaxScanBytes()) { - throw new ResourceLimitExceededException("Query scanned " - + queryContext.getScannedBytes() + " bytes exceeds threshold " - + cubeSeg.getConfig().getQueryMaxScanBytes()); + if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { + throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); } try { if (compressionResult) { - epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString - .zeroCopyGetBytes(result.getCompressedRows()))); + epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); } else { - epResultItr.append(HBaseZeroCopyByteString - .zeroCopyGetBytes(result.getCompressedRows())); + epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); } } catch (IOException | DataFormatException e) { throw new RuntimeException(logHeader + "Error when decompressing", e); @@ -293,8 +278,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { RawScan.serializer.serialize(rs, rawScanBuffer); } rawScanBuffer.flip(); - rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), - rawScanBuffer.limit()); + rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit()); break; } catch (BufferOverflowException boe) { logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize); @@ -309,16 +293,12 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Stats stats = result.getStats(); byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()); - sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ") - .append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); + sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". "); sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". "); sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". "); - sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()) - .append("(ms). "); - sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ") - .append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:") - .append(stats.getFreeSwapSpaceSize()).append("."); + sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). "); + sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append("."); sb.append("Etc message: ").append(stats.getEtcMsg()).append("."); sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append("."); sb.append("Compressed row size: ").append(compressedRows.length); @@ -328,8 +308,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private RuntimeException getCoprocessorException(CubeVisitResponse response) { if (!response.hasErrorInfo()) { - return new RuntimeException( - "Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message"); + return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message"); } CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 97d2373..db81646 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -72,7 +72,7 @@ public abstract class CubeHBaseRPC implements IGTStorage { public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); - + this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; @@ -106,8 +106,7 @@ public abstract class CubeHBaseRPC implements IGTStorage { return scan; } - private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, - ImmutableBitSet selectedColBlocks) { + private RawScan preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); LazyRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid); @@ -247,12 +246,10 @@ public abstract class CubeHBaseRPC implements IGTStorage { } } - private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair( - List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) { + private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) { List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList(); for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) { - org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>( - pair.getFirst(), pair.getSecond()); + org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond()); result.add(element); } @@ -295,24 +292,23 @@ public abstract class CubeHBaseRPC implements IGTStorage { } else { coopTimeout = cubeSeg.getConfig().getQueryCoprocessorTimeoutSeconds() * 1000; } - + int rpcTimeout; Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - + // HBase rpc timeout must be longer than coprocessor timeout if ((int) (coopTimeout * 1.1) > rpcTimeout) { rpcTimeout = (int) (coopTimeout * 1.1); hconf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); } - + // coprocessor timeout is 0 by default if (coopTimeout <= 0) { coopTimeout = (int) (rpcTimeout * 0.9); } - - logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, - coopTimeout); + + logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout); return coopTimeout; } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index f258efb..951e2ef 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -106,8 +106,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } else { List<byte[]> ret = Lists.newArrayList(); for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, - cubeSeg.getTotalShards(cuboid.getId())); + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards(cuboid.getId())); byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); ret.add(cookedKey); @@ -194,13 +193,11 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, - hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false); IGTScanner rawScanner = store.scan(scanRequest); final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); - final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, - decorateScanner.iterator()); + final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator()); return new IGTScanner() { @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index adc210e..59fe9e0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -68,11 +68,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } if (ret == null) { - throw new RuntimeException( - "Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " - + // - GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" - + coprocessorTimeout + ") cannot support this many scans?"); + throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // + GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?"); } return ret; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index 2e1bac4..631e8e8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -45,12 +45,11 @@ public class HBaseReadonlyStore implements IGTStore { private int rowkeyPreambleSize; private boolean withDelay = false; + /** * @param withDelay is for test use */ - public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, - List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, - boolean withDelay) { + public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) { this.cellListIterator = cellListIterator; this.info = gtScanRequest.getInfo(); this.hbaseColumns = hbaseColumns; @@ -77,10 +76,8 @@ public class HBaseReadonlyStore implements IGTStore { //TODO: possible to use binary search as cells might be sorted? public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) { for (Cell c : cells) { - if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 - && // - BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), - columnName.length) == 0) { + if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && // + BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) { return c; } } @@ -124,9 +121,7 @@ public class HBaseReadonlyStore implements IGTStore { // dimensions, set to primary key, also the 0th column block Cell firstCell = oneRow.get(0); - ByteBuffer buf = byteBuffer(firstCell.getRowArray(), - rowkeyPreambleSize + firstCell.getRowOffset(), - firstCell.getRowLength() - rowkeyPreambleSize); + ByteBuffer buf = byteBuffer(firstCell.getRowArray(), rowkeyPreambleSize + firstCell.getRowOffset(), firstCell.getRowLength() - rowkeyPreambleSize); oneRecord.loadCellBlock(0, buf); // metrics http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index bfe4f44..cde127e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -149,8 +149,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement private long rowCount; private long rowBytes; - ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, long rowCountLimit, long bytesLimit, - long timeout) { + ResourceTrackingCellListIterator(Iterator<List<Cell>> delegate, + long rowCountLimit, long bytesLimit, long timeout) { this.delegate = delegate; this.rowCountLimit = rowCountLimit; this.bytesLimit = bytesLimit; @@ -164,8 +164,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement throw new ResourceLimitExceededException("scanned row count exceeds threshold " + rowCountLimit); } if (rowBytes > bytesLimit) { - throw new ResourceLimitExceededException( - "scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); + throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); } if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms"); @@ -196,8 +195,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (shardLength == 0) { return; } - byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] - : region.getRegionInfo().getStartKey(); + byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey(); Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } @@ -222,8 +220,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @SuppressWarnings("checkstyle:methodlength") @Override - public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, - RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { + public void visitCube(final RpcController controller, final CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { List<RegionScanner> regionScanners = Lists.newArrayList(); HRegion region = null; @@ -237,7 +234,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { this.serviceStartTime = System.currentTimeMillis(); - region = (HRegion) env.getRegion(); + region = (HRegion)env.getRegion(); region.startRegionOperation(); // if user change kylin.properties on kylin server, need to manually redeploy coprocessor jar to update KylinConfig of Env. @@ -247,15 +244,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag); - final GTScanRequest scanReq = GTScanRequest.serializer - .deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); + final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { hbaseColumnsToGT.add(intList.getIntsList()); } StorageSideBehavior behavior = StorageSideBehavior.valueOf(scanReq.getStorageBehavior()); - final List<RawScan> hbaseRawScans = deserializeRawScans( - ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); + final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()))); appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - scanReq.getStartTime())); @@ -264,8 +259,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement for (RawScan hbaseRawScan : hbaseRawScans) { if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) { //if has shard, fill region shard to raw scan start/end - updateRawScanByCurrentRegion(hbaseRawScan, region, - request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); + updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN); } Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan); @@ -296,17 +290,16 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); - ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator(allCellLists, + ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator( + allCellLists, scanReq.getStorageScanRowNumThreshold(), // for old client (scan threshold) !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client scanReq.getTimeout()); - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, - hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn()); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn()); IGTScanner rawScanner = store.scan(scanReq); - IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), - behavior.aggrToggledOn(), false, request.getSpillEnabled()); + IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled()); ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); @@ -337,20 +330,22 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } catch (KylinTimeoutException e) { logger.info("Abort scan: {}", e.getMessage()); errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() - .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT).setMessage(e.getMessage()) + .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT) + .setMessage(e.getMessage()) .build(); } catch (ResourceLimitExceededException e) { logger.info("Abort scan: {}", e.getMessage()); errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED) - .setMessage(e.getMessage()).build(); + .setMessage(e.getMessage()) + .build(); } finally { finalScanner.close(); } appendProfileInfo(sb, "agg done"); - logger.info("Total scanned {} rows and {} bytes", cellListIterator.getTotalScannedRowCount(), - cellListIterator.getTotalScannedRowBytes()); + logger.info("Total scanned {} rows and {} bytes", + cellListIterator.getTotalScannedRowCount(), cellListIterator.getTotalScannedRowBytes()); //outputStream.close() is not necessary byte[] compressedAllRows; @@ -368,8 +363,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "compress done"); logger.info("Size of final result = {} ({} before compressing)", compressedAllRows.length, allRows.length); - OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory - .getOperatingSystemMXBean(); + OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad(); double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize(); double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize(); @@ -383,15 +377,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies - setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder() - .setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount) - .setScannedRowCount(cellListIterator.getTotalScannedRowCount()) - .setScannedBytes(cellListIterator.getTotalScannedRowBytes()) - .setServiceStartTime(serviceStartTime).setServiceEndTime(System.currentTimeMillis()) - .setSystemCpuLoad(systemCpuLoad).setFreePhysicalMemorySize(freePhysicalMemorySize) - .setFreeSwapSpaceSize(freeSwapSpaceSize) - .setHostname(InetAddress.getLocalHost().getHostName()).setEtcMsg(sb.toString()) - .setNormalComplete(errorInfo == null ? 1 : 0).build()) + setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder(). + setAggregatedRowCount(cellListIterator.getTotalScannedRowCount() - finalRowCount). + setScannedRowCount(cellListIterator.getTotalScannedRowCount()). + setScannedBytes(cellListIterator.getTotalScannedRowBytes()). + setServiceStartTime(serviceStartTime). + setServiceEndTime(System.currentTimeMillis()). + setSystemCpuLoad(systemCpuLoad). + setFreePhysicalMemorySize(freePhysicalMemorySize). + setFreeSwapSpaceSize(freeSwapSpaceSize). + setHostname(InetAddress.getLocalHost().getHostName()). + setEtcMsg(sb.toString()). + setNormalComplete(errorInfo == null ? 1 : 0).build()) .build()); } catch (IOException ioe) {