Repository: kylin
Updated Branches:
  refs/heads/master c40e2bf41 -> abe590a20


refine coprocessor suicide


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/37d63f41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/37d63f41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/37d63f41

Branch: refs/heads/master
Commit: 37d63f41a8496f7ea93adaac798f7a09b95c8127
Parents: c40e2bf
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Apr 21 15:58:01 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Thu Apr 21 18:12:30 2016 +0800

----------------------------------------------------------------------
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  9 ++---
 .../coprocessor/endpoint/CubeVisitService.java  | 37 ++++++++++++++------
 2 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index d86b37f..88c17ae 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -124,7 +124,9 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
             this.timeout = 
HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
             this.timeout *= 
KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-            logger.info("Timeout for ExpectedSizeIterator is " + this.timeout);
+
+            this.timeout *= 1.1;//allow for some delay 
+            logger.info("Timeout for ExpectedSizeIterator is: " + 
this.timeout);
         }
 
         @Override
@@ -317,7 +319,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString 
{} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
-        logger.info("The scan {} for segment {} is as below with {} separate 
raw scans, shard part of start/end key is set to 0", 
Integer.toHexString(System.identityHashCode(scanRequest)), 
cubeSeg,rawScans.size());
+        logger.info("The scan {} for segment {} is as below with {} separate 
raw scans, shard part of start/end key is set to 0", 
Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, 
rawScans.size());
         for (RawScan rs : rawScans) {
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
@@ -358,8 +360,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                         if (result.getValue().getStats().getNormalComplete() 
!= 1) {
                             abnormalFinish = true;
-                        }
-                        else {
+                        } else {
                             try {
                                 
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
                             } catch (IOException | DataFormatException e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/37d63f41/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 585908b..0cd35f1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -162,7 +162,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
     }
 
     @Override
-    public void visitCube(RpcController controller, 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
+    public void visitCube(final RpcController controller, 
CubeVisitProtos.CubeVisitRequest request, 
RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
 
         List<RegionScanner> regionScanners = Lists.newArrayList();
         HRegion region = null;
@@ -178,7 +178,6 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             region.startRegionOperation();
             debugGitTag = 
region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
 
-
             final GTScanRequest scanReq = 
GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
             List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
             for (IntList intList : request.getHbaseColumnsToGTList()) {
@@ -230,9 +229,9 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                 scanReq.setAggrCacheGB(0); // disable mem check if so told
             }
 
-            final MutableBoolean normalComplete = new MutableBoolean(true);
-            final long startTime = 
this.serviceStartTime;//request.getStartTime();
-            final long timeout = (long) (request.getTimeout() * 0.95);
+            final MutableBoolean scanNormalComplete = new MutableBoolean(true);
+            final long startTime = this.serviceStartTime;
+            final long timeout = request.getTimeout();
 
             final CellListIterator cellListIterator = new CellListIterator() {
 
@@ -247,12 +246,18 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
-                    if (counter++ % 1000 == 1) {
+                    if (counter % 1000 == 1) {
                         if (System.currentTimeMillis() - startTime > timeout) {
-                            normalComplete.setValue(false);
+                            scanNormalComplete.setValue(false);
+                            logger.error("scanner aborted because timeout");
                             return false;
                         }
                     }
+
+                    if (counter % 100000 == 1) {
+                        logger.info("Scanned " + counter + " rows.");
+                    }
+                    counter++;
                     return allCellLists.hasNext();
                 }
 
@@ -279,6 +284,19 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream
 will auto grow
             int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
+
+                if (!scanNormalComplete.booleanValue()) {
+                    logger.error("aggregate iterator aborted because input 
iterator aborts");
+                    break;
+                }
+
+                if (finalRowCount % 1000 == 1) {
+                    if (System.currentTimeMillis() - startTime > timeout) {
+                        logger.error("aggregate iterator aborted because 
timeout");
+                        break;
+                    }
+                }
+
                 buffer.clear();
                 oneRecord.exportColumns(scanReq.getColumns(), buffer);
                 buffer.flip();
@@ -292,7 +310,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
 
             //outputStream.close() is not necessary
             byte[] compressedAllRows;
-            if (normalComplete.booleanValue()) {
+            if (scanNormalComplete.booleanValue()) {
                 allRows = outputStream.toByteArray();
             } else {
                 allRows = new byte[0];
@@ -309,7 +327,6 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             appendProfileInfo(sb, "server stats done");
             sb.append(" debugGitTag:" + debugGitTag);
 
-
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = 
CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
                     
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many 
array copies 
@@ -323,7 +340,7 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
                             setFreeSwapSpaceSize(freeSwapSpaceSize).//
                             
setHostname(InetAddress.getLocalHost().getHostName()).// 
                             setEtcMsg(sb.toString()).//
-                            setNormalComplete(normalComplete.booleanValue() ? 
1 : 0).build())
+                            
setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
                     .//
                     build());
 

Reply via email to