Repository: kylin Updated Branches: refs/heads/master c40e2bf41 -> abe590a20
refine coprocessor suicide Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37d63f41 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37d63f41 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37d63f41 Branch: refs/heads/master Commit: 37d63f41a8496f7ea93adaac798f7a09b95c8127 Parents: c40e2bf Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Apr 21 15:58:01 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Apr 21 18:12:30 2016 +0800 ---------------------------------------------------------------------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 9 ++--- .../coprocessor/endpoint/CubeVisitService.java | 37 ++++++++++++++------ 2 files changed, 32 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index d86b37f..88c17ae 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -124,7 +124,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes(); - logger.info("Timeout for ExpectedSizeIterator is " + this.timeout); + + this.timeout *= 1.1;//allow for some delay + logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout); } @Override @@ -317,7 +319,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); - logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg,rawScans.size()); + logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size()); for (RawScan rs : rawScans) { logScan(rs, cubeSeg.getStorageLocationIdentifier()); } @@ -358,8 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { if (result.getValue().getStats().getNormalComplete() != 1) { abnormalFinish = true; - } - else { + } else { try { epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); } catch (IOException | DataFormatException e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 585908b..0cd35f1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -162,7 +162,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } @Override - public void visitCube(RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { + public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) { List<RegionScanner> regionScanners = Lists.newArrayList(); HRegion region = null; @@ -178,7 +178,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement region.startRegionOperation(); debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag); - final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList(); for (IntList intList : request.getHbaseColumnsToGTList()) { @@ -230,9 +229,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement scanReq.setAggrCacheGB(0); // disable mem check if so told } - final MutableBoolean normalComplete = new MutableBoolean(true); - final long startTime = this.serviceStartTime;//request.getStartTime(); - final long timeout = (long) (request.getTimeout() * 0.95); + final MutableBoolean scanNormalComplete = new MutableBoolean(true); + final long startTime = this.serviceStartTime; + final long timeout = request.getTimeout(); final CellListIterator cellListIterator = new CellListIterator() { @@ -247,12 +246,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public boolean hasNext() { - if (counter++ % 1000 == 1) { + if (counter % 1000 == 1) { if (System.currentTimeMillis() - startTime > timeout) { - normalComplete.setValue(false); + scanNormalComplete.setValue(false); + logger.error("scanner aborted because timeout"); return false; } } + + if (counter % 100000 == 1) { + logger.info("Scanned " + counter + " rows."); + } + counter++; return allCellLists.hasNext(); } @@ -279,6 +284,19 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow int finalRowCount = 0; for (GTRecord oneRecord : finalScanner) { + + if (!scanNormalComplete.booleanValue()) { + logger.error("aggregate iterator aborted because input iterator aborts"); + break; + } + + if (finalRowCount % 1000 == 1) { + if (System.currentTimeMillis() - startTime > timeout) { + logger.error("aggregate iterator aborted because timeout"); + break; + } + } + buffer.clear(); oneRecord.exportColumns(scanReq.getColumns(), buffer); buffer.flip(); @@ -292,7 +310,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement //outputStream.close() is not necessary byte[] compressedAllRows; - if (normalComplete.booleanValue()) { + if (scanNormalComplete.booleanValue()) { allRows = outputStream.toByteArray(); } else { allRows = new byte[0]; @@ -309,7 +327,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement appendProfileInfo(sb, "server stats done"); sb.append(" debugGitTag:" + debugGitTag); - CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies @@ -323,7 +340,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setFreeSwapSpaceSize(freeSwapSpaceSize).// setHostname(InetAddress.getLocalHost().getHostName()).// setEtcMsg(sb.toString()).// - setNormalComplete(normalComplete.booleanValue() ? 1 : 0).build()) + setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build()) .// build());