KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side
Signed-off-by: lidongsjtu <lid...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fcac5fcd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fcac5fcd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fcac5fcd Branch: refs/heads/KYLIN-2881-review Commit: fcac5fcddd2d7ba56495217a337c617a4dff9026 Parents: 54266f8 Author: Zhong <nju_y...@apache.org> Authored: Wed Sep 20 09:46:44 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Sun Jan 21 20:01:34 2018 +0800 ---------------------------------------------------------------------- .../apache/kylin/query/ITKylinQueryTest.java | 4 +- .../apache/kylin/query/ITMassInQueryTest.java | 4 +- .../org/apache/kylin/query/KylinTestBase.java | 34 ++- .../apache/kylin/rest/service/QueryService.java | 14 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 252 ++++++++++++------- .../hbase/cube/v2/ExpectedSizeIterator.java | 34 ++- 6 files changed, 224 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 4edfb3d..02a50ce 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase { String sql = getTextFromFile(sqlFile); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - executeQuery(kylinConn, queryFileName, sql, true); + execQueryUsingKylin(kylinConn, queryFileName, sql, true); } @Ignore @@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); String queriedVersion = String.valueOf(kylinTable.getValue(0, "version")); // compare the result http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java index cca0be6..16395fc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java @@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); printResult(kylinTable); } @@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); // execute H2 sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")"); http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 2c5b556..e38bb1a 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -44,6 +44,7 @@ import java.util.logging.LogManager; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.Pair; @@ -228,6 +229,16 @@ public class KylinTestBase { // //////////////////////////////////////////////////////////////////////////////////////// // execute + private void initExecQueryUsingKylin(String sql) { + QueryContextManager.resetCurrent(); + QueryContextManager.current(); + } + + protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) + throws Exception { + initExecQueryUsingKylin(sql); + return executeQuery(dbConn, queryName, sql, needSort); + } protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) throws Exception { @@ -251,6 +262,7 @@ public class KylinTestBase { } protected int executeQuery(String sql, boolean needDisplay) throws Exception { + initExecQueryUsingKylin(sql); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -302,6 +314,12 @@ public class KylinTestBase { return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare); } + protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, + List<String> parameters, boolean needSort) throws Exception { + initExecQueryUsingKylin(sql); + return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort); + } + protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql, List<String> parameters, boolean needSort) throws Exception { @@ -382,7 +400,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); // compare the result if (BackdoorToggles.getPrepareOnly()) @@ -426,7 +444,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); // compare the result assertTableEquals(expectTable, kylinTable); @@ -449,7 +467,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -478,7 +496,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + sql); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, sql, sql, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false); try { // compare the result @@ -510,7 +528,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -561,7 +579,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort); + ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -601,7 +619,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort); + ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -709,7 +727,7 @@ public class KylinTestBase { //setup cube conn String project = ProjectInstance.DEFAULT_PROJECT_NAME; - cubeConnection = QueryConnection.getConnection(project); + cubeConnection = QueryDataSource.create(project, config).getConnection(); //setup h2 h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa", http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 8e6642c..71b54e3 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -464,7 +464,7 @@ public class QueryService extends BasicService { sqlResponse.setDuration(System.currentTimeMillis() - startTime); sqlResponse.setTraceUrl(traceUrl); - logQuery(sqlRequest, sqlResponse); + logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse); try { recordMetric(sqlRequest, sqlResponse); } catch (Throwable th) { @@ -477,7 +477,7 @@ public class QueryService extends BasicService { } finally { BackdoorToggles.cleanToggles(); - QueryContext.reset(); + QueryContextManager.resetCurrent(); if (scope != null) { scope.close(); } @@ -487,6 +487,7 @@ public class QueryService extends BasicService { private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Message msg = MsgPicker.getMsg(); + final QueryContext queryContext = QueryContextManager.current(); SQLResponse sqlResponse = null; try { @@ -530,13 +531,15 @@ public class QueryService extends BasicService { Trace.addTimelineAnnotation("response from execution"); } catch (Throwable e) { // calcite may throw AssertError + queryContext.stop(e); + logger.error("Exception while executing query", e); String errMsg = makeErrorMsgUserFriendly(e); sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false); - QueryContext queryContext = QueryContext.current(); sqlResponse.setTotalScanCount(queryContext.getScannedRows()); sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); if (queryCacheEnabled && e.getCause() != null && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { @@ -1046,6 +1049,8 @@ public class QueryService extends BasicService { QueryContext queryContext = QueryContextManager.current(); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { + String realizationName = "NULL"; + int realizationType = -1; if (ctx.realization != null) { isPartialResult |= ctx.storageContext.isPartialResultReturned(); if (cubeSb.length() > 0) { @@ -1053,6 +1058,9 @@ public class QueryService extends BasicService { } cubeSb.append(ctx.realization.getCanonicalName()); logSb.append(ctx.storageContext.getProcessedRowCount()).append(" "); + + realizationName = ctx.realization.getName(); + realizationType = ctx.realization.getStorageType(); } queryContext.setContextRealization(ctx.id, realizationName, realizationType); } http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 26ab039..ddf62b7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -19,20 +19,23 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicReference; import java.util.zip.DataFormatException; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; @@ -52,7 +55,6 @@ import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; -import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats; @@ -103,6 +105,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId())); } + static Field channelRowField = null; + static { + try { + channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row"); + channelRowField.setAccessible(true); + } catch (Throwable t) { + logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t); + } + } + @SuppressWarnings("checkstyle:methodlength") @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { @@ -135,7 +147,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it scanRequestByteString = serializeGTScanReq(scanRequest); - final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout); + final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout); logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size()); @@ -165,97 +177,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes()); builder.setIsExactAggregate(storageContext.isExactAggregation()); + final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryContext.getQueryId(), + Integer.toHexString(System.identityHashCode(scanRequest))); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @Override public void run() { - - final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); - final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>(); - - try { - Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); - - final CubeVisitRequest request = builder.build(); - final byte[] startKey = epRange.getFirst(); - final byte[] endKey = epRange.getSecond(); - - table.coprocessorService(CubeVisitService.class, startKey, endKey, // - new Batch.Call<CubeVisitService, CubeVisitResponse>() { - public CubeVisitResponse call(CubeVisitService rowsService) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>(); - rowsService.visitCube(controller, request, rpcCallback); - CubeVisitResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response; - } - }, new Batch.Callback<CubeVisitResponse>() { - @Override - public void update(byte[] region, byte[] row, CubeVisitResponse result) { - if (region == null) { - return; - } - - logger.info(logHeader + getStatsString(region, result)); - - Stats stats = result.getStats(); - queryContext.addAndGetScannedRows(stats.getScannedRowCount()); - queryContext.addAndGetScannedBytes(stats.getScannedBytes()); - - RuntimeException rpcException = null; - if (result.getStats().getNormalComplete() != 1) { - rpcException = getCoprocessorException(result); - } - queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(), - cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(), - cuboid.getId(), storageContext.getFilterMask(), rpcException, - stats.getServiceEndTime() - stats.getServiceStartTime(), 0, - stats.getScannedRowCount(), - stats.getScannedRowCount() - stats.getAggregatedRowCount() - - stats.getFilteredRowCount(), - stats.getAggregatedRowCount(), stats.getScannedBytes()); - - // if any other region has responded with error, skip further processing - if (regionErrorHolder.get() != null) { - return; - } - - // record coprocessor error if happened - if (rpcException != null) { - regionErrorHolder.compareAndSet(null, rpcException); - return; - } - - if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { - throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); - } - - try { - if (compressionResult) { - epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); - } else { - epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); - } - } catch (IOException | DataFormatException e) { - throw new RuntimeException(logHeader + "Error when decompressing", e); - } - } - }); - - } catch (Throwable ex) { - logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout - epResultItr.notifyCoprocException(ex); - return; - } - - if (regionErrorHolder.get() != null) { - RuntimeException exception = regionErrorHolder.get(); - logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout - epResultItr.notifyCoprocException(exception); - } + runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(), + epRange.getSecond(), epResultItr); } }); } @@ -263,6 +192,149 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext); } + private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult, + final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey, + final ExpectedSizeIterator epResultItr) { + + final String queryId = queryContext.getQueryId(); + + try { + final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), + HBaseConnection.getCoprocessorPool()); + + table.coprocessorService(CubeVisitService.class, startKey, endKey, // + new Batch.Call<CubeVisitService, CubeVisitResponse>() { + public CubeVisitResponse call(CubeVisitService rowsService) throws IOException { + if (queryContext.isStopped()) { + logger.warn( + "Query-{}: the query has been stopped, not send request to region server any more.", + queryId); + return null; + } + + HRegionLocation regionLocation = getStartRegionLocation(rowsService); + String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname(); + logger.info("Query-{}: send request to the init region server {} on table {} ", queryId, + regionServerName, table.getName()); + + queryContext.addQueryStopListener(new QueryContext.QueryStopListener() { + private Thread hConnThread = Thread.currentThread(); + + @Override + public void stop(QueryContext query) { + try { + hConnThread.interrupt(); + } catch (Exception e) { + logger.warn("Exception happens during interrupt thread {} due to {}", + hConnThread.getName(), e); + } + } + }); + + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>(); + try { + rowsService.visitCube(controller, request, rpcCallback); + CubeVisitResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response; + } catch (Exception e) { + throw e; + } finally { + // Reset the interrupted state + Thread.interrupted(); + } + } + + private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) { + try { + CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub) rowsService; + RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel) rowsServiceStub + .getChannel(); + byte[] row = (byte[]) channelRowField.get(channel); + return conn.getRegionLocator(table.getName()).getRegionLocation(row, false); + } catch (Throwable throwable) { + logger.warn("error when get region server name", throwable); + } + return null; + } + }, new Batch.Callback<CubeVisitResponse>() { + @Override + public void update(byte[] region, byte[] row, CubeVisitResponse result) { + if (result == null) { + return; + } + if (region == null) { + return; + } + + // if the query is stopped, skip further processing + // this may be caused by + // * Any other region has responded with error + // * ServerRpcController.failedOnException + // * ResourceLimitExceededException + // * Exception happened during CompressionUtils.decompress() + // * Outside exceptions, like KylinTimeoutException in SequentialCubeTupleIterator + if (queryContext.isStopped()) { + return; + } + + logger.info(logHeader + getStatsString(region, result)); + + Stats stats = result.getStats(); + queryContext.addAndGetScannedRows(stats.getScannedRowCount()); + queryContext.addAndGetScannedBytes(stats.getScannedBytes()); + + RuntimeException rpcException = null; + if (result.getStats().getNormalComplete() != 1) { + // record coprocessor error if happened + rpcException = getCoprocessorException(result); + } + queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(), + cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(), + cuboid.getId(), storageContext.getFilterMask(), rpcException, + stats.getServiceEndTime() - stats.getServiceStartTime(), 0, + stats.getScannedRowCount(), + stats.getScannedRowCount() - stats.getAggregatedRowCount() + - stats.getFilteredRowCount(), + stats.getAggregatedRowCount(), stats.getScannedBytes()); + + if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { + rpcException = new ResourceLimitExceededException( + "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + + cubeSeg.getConfig().getQueryMaxScanBytes()); + } + + if (rpcException != null) { + queryContext.stop(rpcException); + return; + } + + try { + if (compressionResult) { + epResultItr.append(CompressionUtils.decompress( + HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); + } else { + epResultItr.append( + HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); + } + } catch (IOException | DataFormatException e) { + throw new RuntimeException(logHeader + "Error when decompressing", e); + } + } + }); + + } catch (Throwable ex) { + queryContext.stop(ex); + } + + if (queryContext.isStopped()) { + logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable()); // double log coz the query thread may already timeout + } + } + private ByteString serializeGTScanReq(GTScanRequest scanRequest) { ByteString scanRequestByteString; int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; http://git-wip-us.apache.org/repos/asf/kylin/blob/fcac5fcd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index 60d85b4..2cb0c7f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -24,19 +24,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.NotImplementedException; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.gridtable.GTScanRequest; import com.google.common.base.Throwables; class ExpectedSizeIterator implements Iterator<byte[]> { - private BlockingQueue<byte[]> queue; - private int expectedSize; + private final QueryContext queryContext; + private final int expectedSize; + private final BlockingQueue<byte[]> queue; + private final long coprocessorTimeout; + private final long deadline; private int current = 0; - private long coprocessorTimeout; - private long deadline; - private volatile Throwable coprocException; - public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) { + public ExpectedSizeIterator(QueryContext queryContext, int expectedSize, long coprocessorTimeout) { + this.queryContext = queryContext; this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); @@ -59,14 +61,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> { current++; byte[] ret = null; - while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) { + while (ret == null && deadline > System.currentTimeMillis()) { + checkState(); ret = queue.poll(1000, TimeUnit.MILLISECONDS); } - if (coprocException != null) { - throw Throwables.propagate(coprocException); - } - if (ret == null) { throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + // GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?"); @@ -85,6 +84,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } public void append(byte[] data) { + checkState(); + try { queue.put(data); } catch (InterruptedException e) { @@ -93,7 +94,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> { } } - public void notifyCoprocException(Throwable ex) { - coprocException = ex; + private void checkState() { + if (queryContext.isStopped()) { + Throwable throwable = queryContext.getThrowable(); + if (throwable != null) { + throw Throwables.propagate(throwable); + } else { + throw new IllegalStateException("the query is stopped: " + queryContext.getStopReason()); + } + } } }