This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3158efa387 Use stats cache on error instead of the chained mechanism 
(#15992)
3158efa387 is described below

commit 3158efa3874285e16591f775be0290be0e4093d7
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Mon Jun 16 14:22:46 2025 +0200

    Use stats cache on error instead of the chained mechanism (#15992)
---
 .../query/service/dispatch/QueryDispatcher.java    | 30 +++++++++++++++-------
 1 file changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index c452b9cbe3..c0c31e034a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -164,18 +164,23 @@ public class QueryDispatcher {
       throws Exception {
     long requestId = context.getRequestId();
     Set<QueryServerInstance> servers = new HashSet<>();
+    boolean cancelled = false;
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
-      return runReducer(requestId, dispatchableSubPlan, timeoutMs, 
queryOptions, _mailboxService);
+      QueryResult result = runReducer(requestId, dispatchableSubPlan, 
timeoutMs, queryOptions, _mailboxService);
+      if (result.getProcessingException() != null) {
+        MultiStageQueryStats statsFromCancel = cancelWithStats(requestId, 
servers);
+        cancelled = true;
+        return result.withStats(statsFromCancel);
+      }
+      return result;
     } catch (Exception ex) {
-      return tryRecover(context.getRequestId(), servers, ex);
-    } catch (Throwable e) {
-      // TODO: Consider always cancel when it returns (early terminate)
-      cancel(requestId);
-      throw e;
+      QueryResult queryResult = tryRecover(context.getRequestId(), servers, 
ex);
+      cancelled = true;
+      return queryResult;
     } finally {
-      if (isQueryCancellationEnabled()) {
-        _serversByQuery.remove(requestId);
+      if (!cancelled) {
+        cancel(requestId, servers);
       }
     }
   }
@@ -199,7 +204,6 @@ public class QueryDispatcher {
       errorCode = ((QueryException) ex).getErrorCode();
     } else {
       // in case of unknown exceptions, the exception will be rethrown, so we 
don't need stats
-      cancel(requestId, servers);
       throw ex;
     }
     // in case of known exceptions (timeout or query exception), we need can 
build here the erroneous QueryResult
@@ -788,6 +792,14 @@ public class QueryDispatcher {
       }
     }
 
+    public QueryResult withStats(MultiStageQueryStats newQueryStats) {
+      if (_processingException != null) {
+        return new QueryResult(_processingException, newQueryStats, 
_brokerReduceTimeMs);
+      } else {
+        return new QueryResult(_resultTable, newQueryStats, 
_brokerReduceTimeMs);
+      }
+    }
+
     @Nullable
     public ResultTable getResultTable() {
       return _resultTable;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to