KYLIN-2443 Report coprocessor error information back to client
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/43c05667 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/43c05667 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/43c05667 Branch: refs/heads/master Commit: 43c0566728092d537201d751d3e8f6e3c0d5f051 Parents: 707b055 Author: gaodayue <gaoda...@meituan.com> Authored: Sat Feb 11 19:13:24 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Sun Feb 12 09:34:49 2017 +0800 ---------------------------------------------------------------------- build/smoke-test/sql/sql1.json | 1 - build/smoke-test/testQuery.py | 1 + .../org/apache/kylin/common/QueryContext.java | 21 +- .../exceptions/KylinTimeoutException.java | 26 + .../ResourceLimitExceededException.java | 30 + .../kylin/gridtable/GTAggregateScanner.java | 12 +- .../GTScanExceedThresholdException.java | 26 - .../GTScanSelfTerminatedException.java | 30 - .../kylin/gridtable/GTScanTimeoutException.java | 26 - .../apache/kylin/storage/StorageContext.java | 19 +- .../storage/gtrecord/CubeSegmentScanner.java | 2 +- .../kylin/storage/gtrecord/ScannerWorker.java | 5 +- .../gtrecord/SequentialCubeTupleIterator.java | 6 +- .../apache/kylin/query/ITKylinQueryTest.java | 6 +- .../apache/kylin/rest/service/QueryService.java | 28 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 69 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 8 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 15 +- .../coprocessor/endpoint/CubeVisitService.java | 42 +- .../endpoint/generated/CubeVisitProtos.java | 1254 ++++++++++++++++-- .../endpoint/protobuf/CubeVisit.proto | 12 +- 21 files changed, 1331 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/build/smoke-test/sql/sql1.json ---------------------------------------------------------------------- diff --git a/build/smoke-test/sql/sql1.json b/build/smoke-test/sql/sql1.json index 7cb3258..21e4c01 100644 --- a/build/smoke-test/sql/sql1.json +++ b/build/smoke-test/sql/sql1.json @@ -9,7 +9,6 @@ ] ], "exceptionMessage": null, - "totalScanCount": 1, "columnMetas": [ { "scale": 0, http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/build/smoke-test/testQuery.py ---------------------------------------------------------------------- diff --git a/build/smoke-test/testQuery.py b/build/smoke-test/testQuery.py index 87a2456..99c09d3 100644 --- a/build/smoke-test/testQuery.py +++ b/build/smoke-test/testQuery.py @@ -59,6 +59,7 @@ class testQuery(unittest.TestCase): del actual_result['duration'] del actual_result['hitExceptionCache'] del actual_result['storageCacheUsed'] + del actual_result['totalScanCount'] del actual_result['totalScanBytes'] expect_result = json.loads(open(sql_file[:-4] + '.json').read().strip()) http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 3a73993..67925b6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -21,7 +21,7 @@ package org.apache.kylin.common; import java.util.concurrent.atomic.AtomicLong; /** - * checkout {@link org.apache.kylin.common.debug.BackdoorToggles} for comparison + * Holds per query information and statistics. */ public class QueryContext { @@ -33,7 +33,8 @@ public class QueryContext { }; private String queryId; - private AtomicLong scanBytes = new AtomicLong(); + private AtomicLong scannedRows = new AtomicLong(); + private AtomicLong scannedBytes = new AtomicLong(); private QueryContext() { // use QueryContext.current() instead @@ -55,11 +56,19 @@ public class QueryContext { this.queryId = queryId; } - public long getScanBytes() { - return scanBytes.get(); + public long getScannedRows() { + return scannedRows.get(); } - public long addAndGetScanBytes(long delta) { - return scanBytes.addAndGet(delta); + public long addAndGetScannedRows(long deltaRows) { + return scannedRows.addAndGet(deltaRows); + } + + public long getScannedBytes() { + return scannedBytes.get(); + } + + public long addAndGetScannedBytes(long deltaBytes) { + return scannedBytes.addAndGet(deltaBytes); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java b/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java new file mode 100644 index 0000000..75d981f --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/exceptions/KylinTimeoutException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.exceptions; + +public class KylinTimeoutException extends RuntimeException { + + public KylinTimeoutException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java b/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java new file mode 100644 index 0000000..df5d88e --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/exceptions/ResourceLimitExceededException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.exceptions; + +public class ResourceLimitExceededException extends RuntimeException { + + public ResourceLimitExceededException(String message) { + super(message); + } + + public ResourceLimitExceededException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index dd359f8..8b0efcc 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -35,6 +35,7 @@ import java.util.SortedMap; import java.util.Map.Entry; import org.apache.commons.io.IOUtils; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; @@ -278,14 +279,11 @@ public class GTAggregateScanner implements IGTScanner { final long estMemSize = estimatedMemSize(); if (spillThreshold > 0 && estMemSize > spillThreshold) { - // spill to disk when aggBufMap used too large memory - if (spillEnabled) { - spillBuffMap(estMemSize); - aggBufMap = createBuffMap(); - - } else { - throw new GTScanSelfTerminatedException("Aggregation using more than " + spillThreshold + " memory and spill is disabled"); + if (!spillEnabled) { + throw new ResourceLimitExceededException("aggregation's memory consumption " + estMemSize + " exceeds threshold " + spillThreshold); } + spillBuffMap(estMemSize); // spill to disk + aggBufMap = createBuffMap(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java deleted file mode 100644 index ba75962..0000000 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.gridtable; - -public class GTScanExceedThresholdException extends GTScanSelfTerminatedException { - - public GTScanExceedThresholdException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java deleted file mode 100644 index 30d3aaa..0000000 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanSelfTerminatedException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.gridtable; - -/** - * Implementations of {@link IGTScanner} should throw {@link GTScanSelfTerminatedException} or its subclasses - * in cases where the scan runs out of resources (time, memory, etc) and can not be continued. - */ -public class GTScanSelfTerminatedException extends RuntimeException { - - public GTScanSelfTerminatedException(String s) { - super(s); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java deleted file mode 100644 index 17a8d02..0000000 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.gridtable; - -public class GTScanTimeoutException extends GTScanSelfTerminatedException { - - public GTScanTimeoutException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 0f52c53..4713d71 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -47,8 +47,7 @@ public class StorageContext { private boolean enableCoprocessor = false; private IStorageQuery storageQuery; - private AtomicLong totalScanCount = new AtomicLong(); - private AtomicLong totalScanBytes = new AtomicLong(); + private AtomicLong processedRowCount = new AtomicLong(); private Cuboid cuboid; private boolean partialResultReturned = false; @@ -140,20 +139,12 @@ public class StorageContext { return cuboid; } - public long getTotalScanCount() { - return totalScanCount.get(); + public long getProcessedRowCount() { + return processedRowCount.get(); } - public long increaseTotalScanCount(long count) { - return this.totalScanCount.addAndGet(count); - } - - public long getTotalScanBytes() { - return totalScanBytes.get(); - } - - public long increaseTotalScanBytes(long bytes) { - return totalScanBytes.addAndGet(bytes); + public long increaseProcessedRowCount(long count) { + return processedRowCount.addAndGet(count); } public boolean isAcceptPartialResult() { http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index 974b8ea..029502c 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner { } scanRequest = scanRangePlanner.planScanRequest(); String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); - scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context); + scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java index 2a2a86a..fd50c54 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java @@ -26,7 +26,6 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStorage; import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.storage.StorageContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ public class ScannerWorker { private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); private IGTScanner internal = null; - public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) { + public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) { if (scanRequest == null) { logger.info("Segment {} will be skipped", segment); internal = new EmptyGTScanner(0); @@ -49,7 +48,7 @@ public class ScannerWorker { final GTInfo info = scanRequest.getInfo(); try { - IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior + IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior internal = rpc.getGTScanner(scanRequest); } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index bb2d7f9..14b6394 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -26,8 +26,8 @@ import java.util.Set; import javax.annotation.Nullable; +import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; @@ -139,7 +139,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { if (scanCount++ % 100 == 1 && System.currentTimeMillis() > context.getDeadline()) { - throw new GTScanTimeoutException("Query Timeout!"); + throw new KylinTimeoutException("Query timeout after \"kylin.query.timeout-seconds\" seconds"); } if (++scanCountDelta >= 1000) @@ -173,7 +173,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { } private void flushScanCountDelta() { - context.increaseTotalScanCount(scanCountDelta); + context.increaseProcessedRowCount(scanCountDelta); scanCountDelta = 0; } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 87ddcb8..4590e60 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.gridtable.GTScanSelfTerminatedException; +import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.gridtable.StorageSideBehavior; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.routing.Candidate; @@ -124,12 +124,12 @@ public class ITKylinQueryTest extends KylinTestBase { System.out.println(e.getMessage()); - if (findRoot(e) instanceof GTScanSelfTerminatedException) { + if (findRoot(e) instanceof KylinTimeoutException) { //expected continue; } } - throw new RuntimeException("Expecting GTScanTimeoutException"); + throw new RuntimeException("Expecting KylinTimeoutException"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 7d9e24d..4c02aa4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -57,13 +57,13 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.metadata.project.RealizationEntry; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.relnode.OLAPContext; @@ -328,10 +328,12 @@ public class QueryService extends BasicService { throw new InternalErrorException("Project cannot be empty. Please select a project."); } - final String queryId = UUID.randomUUID().toString(); if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); - QueryContext.current().setQueryId(queryId); + + final QueryContext queryContext = QueryContext.current(); + final String queryId = UUID.randomUUID().toString(); + queryContext.setQueryId(queryId); try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { String sql = sqlRequest.getSql(); @@ -372,6 +374,8 @@ public class QueryService extends BasicService { } else { sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setTotalScanCount(0); + sqlResponse.setTotalScanBytes(0); } checkQueryAuth(sqlResponse); @@ -381,9 +385,10 @@ public class QueryService extends BasicService { String errMsg = QueryUtil.makeErrorMsgUserFriendly(e); sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + sqlResponse.setTotalScanCount(queryContext.getScannedRows()); + sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); - // for exception queries, only cache ScanOutOfLimitException - if (queryCacheEnabled && e instanceof GTScanExceedThresholdException) { + if (queryCacheEnabled && e.getCause() != null && e.getCause() instanceof ResourceLimitExceededException) { Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); exceptionCache.put(new Element(sqlRequest, sqlResponse)); } @@ -582,26 +587,21 @@ public class QueryService extends BasicService { boolean isPartialResult = false; String cube = ""; - StringBuilder sb = new StringBuilder("Scan stats for each storageContext: "); - long totalScanCount = 0; - long totalScanBytes = 0; + StringBuilder sb = new StringBuilder("Processed rows for each storageContext: "); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { if (ctx.realization != null) { isPartialResult |= ctx.storageContext.isPartialResultReturned(); cube = ctx.realization.getName(); - totalScanCount += ctx.storageContext.getTotalScanCount(); - totalScanBytes += ctx.storageContext.getTotalScanBytes(); - sb.append("{rows=").append(ctx.storageContext.getTotalScanCount()). - append(" bytes=").append(ctx.storageContext.getTotalScanBytes()).append("} "); + sb.append(ctx.storageContext.getProcessedRowCount()).append(" "); } } } logger.info(sb.toString()); SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult); - response.setTotalScanCount(totalScanCount); - response.setTotalScanBytes(totalScanBytes); + response.setTotalScanCount(QueryContext.current().getScannedRows()); + response.setTotalScanBytes(QueryContext.current().getScannedBytes()); return response; } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 68a84c1..3c01da2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.zip.DataFormatException; import org.apache.hadoop.hbase.TableName; @@ -33,7 +34,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; @@ -43,12 +45,9 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer; import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; @@ -71,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private static ExecutorService executorService = new LoggableCachedThreadPool(); - public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { - super(segment, cuboid, fullGTInfo, context); + public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { + super(segment, cuboid, fullGTInfo); } private byte[] getByteArrayForShort(short v) { @@ -107,8 +106,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @SuppressWarnings("checkstyle:methodlength") @Override public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException { - final QueryContext queryContext = QueryContext.current(); - Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard(); short shardNum = shardNumAndBaseShard.getFirst(); short cuboidBaseShard = shardNumAndBaseShard.getSecond(); @@ -175,7 +172,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { public void run() { final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); - final boolean[] abnormalFinish = new boolean[1]; + final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>(); try { Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool()); @@ -199,22 +196,32 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }, new Batch.Callback<CubeVisitResponse>() { @Override public void update(byte[] region, byte[] row, CubeVisitResponse result) { - if (region == null) + if (region == null) { return; + } - final long scanBytes = result.getStats().getScannedBytes(); - context.increaseTotalScanBytes(scanBytes); - totalScannedCount.addAndGet(result.getStats().getScannedRowCount()); logger.info(logHeader + getStatsString(region, result)); - if (queryContext.addAndGetScanBytes(scanBytes) > cubeSeg.getConfig().getQueryMaxScanBytes()) { - throw new GTScanExceedThresholdException("Query scanned " + queryContext.getScanBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); + Stats stats = result.getStats(); + queryContext.addAndGetScannedRows(stats.getScannedRowCount()); + queryContext.addAndGetScannedBytes(stats.getScannedBytes()); + totalScannedCount.addAndGet(stats.getScannedRowCount()); + + // if any other region has responded with error, skip further processing + if (regionErrorHolder.get() != null) { + return; } + // record coprocessor error if happened if (result.getStats().getNormalComplete() != 1) { - abnormalFinish[0] = true; + regionErrorHolder.compareAndSet(null, getCoprocessorException(result)); return; } + + if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) { + throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes()); + } + try { if (compressionResult) { epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); @@ -233,11 +240,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { return; } - if (abnormalFinish[0]) { - Throwable ex = new GTScanSelfTerminatedException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query..."); - logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout - epResultItr.notifyCoprocException(ex); - return; + if (regionErrorHolder.get() != null) { + RuntimeException exception = regionErrorHolder.get(); + logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout + epResultItr.notifyCoprocException(exception); } } }); @@ -288,6 +294,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private String getStatsString(byte[] region, CubeVisitResponse result) { StringBuilder sb = new StringBuilder(); Stats stats = result.getStats(); + byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()); + sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append("."); sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". "); sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". "); @@ -296,8 +304,27 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append("."); sb.append("Etc message: ").append(stats.getEtcMsg()).append("."); sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append("."); + sb.append("Compressed row size: ").append(compressedRows.length); return sb.toString(); } + private RuntimeException getCoprocessorException(CubeVisitResponse response) { + if (!response.hasErrorInfo()) { + return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message"); + } + + CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo(); + + switch (errorInfo.getType()) { + case UNKNOWN_TYPE: + return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage()); + case TIMEOUT: + return new KylinTimeoutException(errorInfo.getMessage()); + case RESOURCE_LIMIT_EXCEEDED: + return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage()); + default: + throw new AssertionError("Unknown error type: " + errorInfo.getType()); + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 11fbb19..f24290c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; @@ -48,7 +49,6 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.IGTStorage; -import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,18 +63,18 @@ public abstract class CubeHBaseRPC implements IGTStorage { final protected CubeSegment cubeSeg; final protected Cuboid cuboid; final protected GTInfo fullGTInfo; - final protected StorageContext context; + final protected QueryContext queryContext; final private RowKeyEncoder fuzzyKeyEncoder; final private RowKeyEncoder fuzzyMaskEncoder; - public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) { + public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) { Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment"); this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; - this.context = context; + this.queryContext = QueryContext.current(); this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index b94346c..1698180 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -42,7 +42,6 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,8 +87,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } } - public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) { - super(segment, cuboid, fullGTInfo, context); + public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) { + super(segment, cuboid, fullGTInfo); } @Override @@ -182,15 +181,18 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator()); CellListIterator cellListIterator = new CellListIterator() { - long scanBytes = 0; + long scannedRows = 0; + long scannedBytes = 0; @Override public void close() throws IOException { + queryContext.addAndGetScannedRows(scannedRows); + queryContext.addAndGetScannedBytes(scannedBytes); + for (ResultScanner scanner : scanners) { scanner.close(); } hbaseTable.close(); - context.increaseTotalScanBytes(scanBytes); } @Override @@ -202,8 +204,9 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { public List<Cell> next() { List<Cell> result = allResultsIterator.next().listCells(); for (Cell cell : result) { - scanBytes += CellUtil.estimatedSizeOf(cell); + scannedBytes += CellUtil.estimatedSizeOf(cell); } + scannedRows++; return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/43c05667/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 5fd9740..e18ff0d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -31,7 +31,6 @@ import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; @@ -44,16 +43,15 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.GTScanExceedThresholdException; import org.apache.kylin.gridtable.GTScanRequest; -import org.apache.kylin.gridtable.GTScanSelfTerminatedException; -import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.gridtable.IGTStore; import org.apache.kylin.gridtable.StorageSideBehavior; @@ -165,13 +163,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement @Override public boolean hasNext() { if (rowCount > rowCountLimit) { - throw new GTScanExceedThresholdException("Number of rows scanned exceeds threshold " + rowCountLimit); + throw new ResourceLimitExceededException("scanned row count exceeds threshold " + rowCountLimit); } if (rowBytes > bytesLimit) { - throw new GTScanExceedThresholdException("Scanned " + rowBytes + " bytes exceeds threshold " + bytesLimit); + throw new ResourceLimitExceededException("scanned bytes " + rowBytes + " exceeds threshold " + bytesLimit); } if ((rowCount % GTScanRequest.terminateCheckInterval == 1) && System.currentTimeMillis() > deadline) { - throw new GTScanTimeoutException("Scan timeout after " + timeout + " ms"); + throw new KylinTimeoutException("coprocessor timeout after " + timeout + " ms"); } return delegate.hasNext(); } @@ -232,6 +230,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement byte[] allRows; String debugGitTag = ""; + CubeVisitProtos.CubeVisitResponse.ErrorInfo errorInfo = null; + String queryId = request.hasQueryId() ? request.getQueryId() : "UnknownId"; try (SetThreadName ignored = new SetThreadName("Query %s", queryId)) { this.serviceStartTime = System.currentTimeMillis(); @@ -292,7 +292,6 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement scanReq.disableAggCacheMemCheck(); // disable mem check if so told } - final MutableBoolean scanNormalComplete = new MutableBoolean(true); final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); ResourceTrackingCellListIterator cellListIterator = new ResourceTrackingCellListIterator( @@ -332,11 +331,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement break; } } - } catch (GTScanSelfTerminatedException e) { - // the query is using too much resource, we mark it as abnormal finish instead of - // throwing RuntimeException to avoid client retrying RPC. - scanNormalComplete.setValue(false); - logger.warn("Abort scan: {}", e.getMessage()); + } catch (KylinTimeoutException e) { + logger.info("Abort scan: {}", e.getMessage()); + errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() + .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.TIMEOUT) + .setMessage(e.getMessage()) + .build(); + } catch (ResourceLimitExceededException e) { + logger.info("Abort scan: {}", e.getMessage()); + errorInfo = CubeVisitProtos.CubeVisitResponse.ErrorInfo.newBuilder() + .setType(CubeVisitProtos.CubeVisitResponse.ErrorType.RESOURCE_LIMIT_EXCEEDED) + .setMessage(e.getMessage()) + .build(); } finally { finalScanner.close(); } @@ -347,7 +353,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement //outputStream.close() is not necessary byte[] compressedAllRows; - if (scanNormalComplete.booleanValue()) { + if (errorInfo == null) { allRows = outputStream.toByteArray(); } else { allRows = new byte[0]; @@ -370,6 +376,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement sb.append(" debugGitTag:" + debugGitTag); CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder(); + if (errorInfo != null) { + responseBuilder.setErrorInfo(errorInfo); + } done.run(responseBuilder.// setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder(). @@ -383,9 +392,8 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement setFreeSwapSpaceSize(freeSwapSpaceSize). setHostname(InetAddress.getLocalHost().getHostName()). setEtcMsg(sb.toString()). - setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build()) - .// - build()); + setNormalComplete(errorInfo == null ? 1 : 0).build()) + .build()); } catch (IOException ioe) { logger.error(ioe.toString(), ioe);