This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3158efa387 Use stats cache on error instead of the chained mechanism (#15992) 3158efa387 is described below commit 3158efa3874285e16591f775be0290be0e4093d7 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Mon Jun 16 14:22:46 2025 +0200 Use stats cache on error instead of the chained mechanism (#15992) --- .../query/service/dispatch/QueryDispatcher.java | 30 +++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index c452b9cbe3..c0c31e034a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -164,18 +164,23 @@ public class QueryDispatcher { throws Exception { long requestId = context.getRequestId(); Set<QueryServerInstance> servers = new HashSet<>(); + boolean cancelled = false; try { submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions); - return runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); + QueryResult result = runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); + if (result.getProcessingException() != null) { + MultiStageQueryStats statsFromCancel = cancelWithStats(requestId, servers); + cancelled = true; + return result.withStats(statsFromCancel); + } + return result; } catch (Exception ex) { - return tryRecover(context.getRequestId(), servers, ex); - } catch (Throwable e) { - // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId); - throw e; + QueryResult queryResult = tryRecover(context.getRequestId(), servers, ex); + cancelled = true; + return queryResult; } finally { - if (isQueryCancellationEnabled()) { - _serversByQuery.remove(requestId); + if (!cancelled) { + cancel(requestId, servers); } } } @@ -199,7 +204,6 @@ public class QueryDispatcher { errorCode = ((QueryException) ex).getErrorCode(); } else { // in case of unknown exceptions, the exception will be rethrown, so we don't need stats - cancel(requestId, servers); throw ex; } // in case of known exceptions (timeout or query exception), we need can build here the erroneous QueryResult @@ -788,6 +792,14 @@ public class QueryDispatcher { } } + public QueryResult withStats(MultiStageQueryStats newQueryStats) { + if (_processingException != null) { + return new QueryResult(_processingException, newQueryStats, _brokerReduceTimeMs); + } else { + return new QueryResult(_resultTable, newQueryStats, _brokerReduceTimeMs); + } + } + @Nullable public ResultTable getResultTable() { return _resultTable; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org