This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d3a671300d Improve broker error messaging when broker is the one 
reporting the failure (#16076)
9d3a671300d is described below

commit 9d3a671300d9fae776fb35be0f8b6500827d6c62
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Tue Jun 17 09:05:47 2025 +0200

    Improve broker error messaging when broker is the one reporting the failure 
(#16076)
---
 .../core/query/reduce/GroupByDataTableReducer.java     |  2 +-
 .../apache/pinot/query/mailbox/ReceivingMailbox.java   | 18 ++++++++++++------
 .../pinot/query/runtime/blocks/ErrorMseBlock.java      |  4 +---
 .../pinot/query/service/dispatch/QueryDispatcher.java  | 17 +++++++++++------
 4 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 1d83f7190da..2072e937deb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -356,7 +356,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     try {
       long timeOutMs = reducerContext.getReduceTimeOutMs() - 
(System.currentTimeMillis() - start);
       if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
-        throw new TimeoutException("Timed out in broker reduce phase");
+        throw new TimeoutException("Timed out on broker reduce phase");
       }
       Throwable t = exception.get();
       if (t != null) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 4e9736d4f5b..6588e1cf76a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -61,8 +61,8 @@ public class ReceivingMailbox {
   public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ReceivingMailbox.class);
-  private static final MseBlockWithStats CANCELLED_ERROR_BLOCK = new 
MseBlockWithStats(
-      ErrorMseBlock.fromException(new RuntimeException("Cancelled by 
receiver")), Collections.emptyList());
+  // This was previously a static final attribute, but now that includes 
server and stage, we cannot use constants
+  private volatile MseBlockWithStats _cancelledErrorBlock;
 
   private final String _id;
   // TODO: Make the queue size configurable
@@ -152,7 +152,7 @@ public class ReceivingMailbox {
     MseBlockWithStats errorBlock = _errorBlock.get();
     if (errorBlock != null) {
       LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring 
the late block", _id);
-      return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
+      return errorBlock == _cancelledErrorBlock ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
     }
     if (timeoutMs <= 0) {
       LOGGER.debug("Mailbox: {} is already timed out", _id);
@@ -177,7 +177,7 @@ public class ReceivingMailbox {
         } else {
           LOGGER.debug("Mailbox: {} is already cancelled or errored out, 
ignoring the late block", _id);
           _blocks.clear();
-          return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
+          return errorBlock == _cancelledErrorBlock ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
         }
       } else {
         LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", 
_id, timeoutMs);
@@ -233,8 +233,14 @@ public class ReceivingMailbox {
    */
   public void cancel() {
     LOGGER.debug("Cancelling mailbox: {}", _id);
-    if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
-      _blocks.clear();
+    if (_errorBlock.get() == null) {
+      MseBlockWithStats errorBlock = new MseBlockWithStats(
+          ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled 
by receiver"),
+          Collections.emptyList());
+      if (_errorBlock.compareAndSet(null, errorBlock)) {
+        _cancelledErrorBlock = errorBlock;
+        _blocks.clear();
+      }
     }
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
index e07dad85819..6ae8b15e1d8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java
@@ -61,15 +61,13 @@ public class ErrorMseBlock implements MseBlock.Eos {
   public static ErrorMseBlock fromMap(Map<QueryErrorCode, String> 
errorMessages) {
     int stage;
     int worker;
-    String server;
+    String server = QueryThreadContext.isInitialized() ? 
QueryThreadContext.getInstanceId() : "unknown";
     if (MseWorkerThreadContext.isInitialized()) {
       stage = MseWorkerThreadContext.getStageId();
       worker = MseWorkerThreadContext.getWorkerId();
-      server = QueryThreadContext.getInstanceId();
     } else {
       stage = -1; // Default value when not initialized
       worker = -1; // Default value when not initialized
-      server = null; // Default value when not initialized
     }
     return new ErrorMseBlock(stage, worker, server, errorMessages);
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index c0c31e034af..d7ef6790d25 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -64,6 +64,7 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.DataBlockExtractUtils;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
+import org.apache.pinot.query.MseWorkerThreadContext;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
@@ -614,7 +615,11 @@ public class QueryDispatcher {
     ArrayList<Object[]> resultRows = new ArrayList<>();
     MseBlock block;
     MultiStageQueryStats queryStats;
-    try (OpChain opChain = PlanNodeToOpChain.convert(rootNode, 
executionContext, (a, b) -> { })) {
+    try (
+        QueryThreadContext.CloseableContext mseCloseableCtx = 
MseWorkerThreadContext.open();
+        OpChain opChain = PlanNodeToOpChain.convert(rootNode, 
executionContext, (a, b) -> { })) {
+      MseWorkerThreadContext.setStageId(0);
+      MseWorkerThreadContext.setWorkerId(0);
       MultiStageOperator rootOperator = opChain.getRoot();
       block = rootOperator.nextBlock();
       while (block.isData()) {
@@ -648,21 +653,21 @@ public class QueryDispatcher {
       Map.Entry<QueryErrorCode, String> error;
       String from;
       if (errorBlock.getStageId() >= 0) {
-        from = "from stage " + errorBlock.getStageId();
+        from = " from stage " + errorBlock.getStageId();
         if (errorBlock.getServerId() != null) {
-          from += " on server " + errorBlock.getServerId();
+          from += " on " + errorBlock.getServerId();
         }
       } else {
-        from = "from servers";
+        from = "";
       }
       if (queryExceptions.size() == 1) {
         error = queryExceptions.entrySet().iterator().next();
-        errorMessage = "Received 1 error " + from + ": " + error.getValue();
+        errorMessage = "Received 1 error" + from + ": " + error.getValue();
       } else {
         error = queryExceptions.entrySet().stream()
             .max(QueryDispatcher::compareErrors)
             .orElseThrow();
-        errorMessage = "Received " + queryExceptions.size() + " errors " + 
from + ". "
+        errorMessage = "Received " + queryExceptions.size() + " errors" + from 
+ ". "
                 + "The one with highest priority is: " + error.getValue();
       }
       QueryProcessingException processingEx = new 
QueryProcessingException(error.getKey().getId(), errorMessage);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to