This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9d3a671300d Improve broker error messaging when broker is the one reporting the failure (#16076) 9d3a671300d is described below commit 9d3a671300d9fae776fb35be0f8b6500827d6c62 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Tue Jun 17 09:05:47 2025 +0200 Improve broker error messaging when broker is the one reporting the failure (#16076) --- .../core/query/reduce/GroupByDataTableReducer.java | 2 +- .../apache/pinot/query/mailbox/ReceivingMailbox.java | 18 ++++++++++++------ .../pinot/query/runtime/blocks/ErrorMseBlock.java | 4 +--- .../pinot/query/service/dispatch/QueryDispatcher.java | 17 +++++++++++------ 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 1d83f7190da..2072e937deb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -356,7 +356,7 @@ public class GroupByDataTableReducer implements DataTableReducer { try { long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start); if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("Timed out in broker reduce phase"); + throw new TimeoutException("Timed out on broker reduce phase"); } Throwable t = exception.get(); if (t != null) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java index 4e9736d4f5b..6588e1cf76a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java @@ -61,8 +61,8 @@ public class ReceivingMailbox { public static final int DEFAULT_MAX_PENDING_BLOCKS = 5; private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class); - private static final MseBlockWithStats CANCELLED_ERROR_BLOCK = new MseBlockWithStats( - ErrorMseBlock.fromException(new RuntimeException("Cancelled by receiver")), Collections.emptyList()); + // This was previously a static final attribute, but now that includes server and stage, we cannot use constants + private volatile MseBlockWithStats _cancelledErrorBlock; private final String _id; // TODO: Make the queue size configurable @@ -152,7 +152,7 @@ public class ReceivingMailbox { MseBlockWithStats errorBlock = _errorBlock.get(); if (errorBlock != null) { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); - return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; + return errorBlock == _cancelledErrorBlock ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; } if (timeoutMs <= 0) { LOGGER.debug("Mailbox: {} is already timed out", _id); @@ -177,7 +177,7 @@ public class ReceivingMailbox { } else { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); _blocks.clear(); - return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; + return errorBlock == _cancelledErrorBlock ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; } } else { LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", _id, timeoutMs); @@ -233,8 +233,14 @@ public class ReceivingMailbox { */ public void cancel() { LOGGER.debug("Cancelling mailbox: {}", _id); - if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) { - _blocks.clear(); + if (_errorBlock.get() == null) { + MseBlockWithStats errorBlock = new MseBlockWithStats( + ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled by receiver"), + Collections.emptyList()); + if (_errorBlock.compareAndSet(null, errorBlock)) { + _cancelledErrorBlock = errorBlock; + _blocks.clear(); + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java index e07dad85819..6ae8b15e1d8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ErrorMseBlock.java @@ -61,15 +61,13 @@ public class ErrorMseBlock implements MseBlock.Eos { public static ErrorMseBlock fromMap(Map<QueryErrorCode, String> errorMessages) { int stage; int worker; - String server; + String server = QueryThreadContext.isInitialized() ? QueryThreadContext.getInstanceId() : "unknown"; if (MseWorkerThreadContext.isInitialized()) { stage = MseWorkerThreadContext.getStageId(); worker = MseWorkerThreadContext.getWorkerId(); - server = QueryThreadContext.getInstanceId(); } else { stage = -1; // Default value when not initialized worker = -1; // Default value when not initialized - server = null; // Default value when not initialized } return new ErrorMseBlock(stage, worker, server, errorMessages); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index c0c31e034af..d7ef6790d25 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -64,6 +64,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.DataBlockExtractUtils; import org.apache.pinot.core.util.trace.TracedThreadFactory; +import org.apache.pinot.query.MseWorkerThreadContext; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; @@ -614,7 +615,11 @@ public class QueryDispatcher { ArrayList<Object[]> resultRows = new ArrayList<>(); MseBlock block; MultiStageQueryStats queryStats; - try (OpChain opChain = PlanNodeToOpChain.convert(rootNode, executionContext, (a, b) -> { })) { + try ( + QueryThreadContext.CloseableContext mseCloseableCtx = MseWorkerThreadContext.open(); + OpChain opChain = PlanNodeToOpChain.convert(rootNode, executionContext, (a, b) -> { })) { + MseWorkerThreadContext.setStageId(0); + MseWorkerThreadContext.setWorkerId(0); MultiStageOperator rootOperator = opChain.getRoot(); block = rootOperator.nextBlock(); while (block.isData()) { @@ -648,21 +653,21 @@ public class QueryDispatcher { Map.Entry<QueryErrorCode, String> error; String from; if (errorBlock.getStageId() >= 0) { - from = "from stage " + errorBlock.getStageId(); + from = " from stage " + errorBlock.getStageId(); if (errorBlock.getServerId() != null) { - from += " on server " + errorBlock.getServerId(); + from += " on " + errorBlock.getServerId(); } } else { - from = "from servers"; + from = ""; } if (queryExceptions.size() == 1) { error = queryExceptions.entrySet().iterator().next(); - errorMessage = "Received 1 error " + from + ": " + error.getValue(); + errorMessage = "Received 1 error" + from + ": " + error.getValue(); } else { error = queryExceptions.entrySet().stream() .max(QueryDispatcher::compareErrors) .orElseThrow(); - errorMessage = "Received " + queryExceptions.size() + " errors " + from + ". " + errorMessage = "Received " + queryExceptions.size() + " errors" + from + ". " + "The one with highest priority is: " + error.getValue(); } QueryProcessingException processingEx = new QueryProcessingException(error.getKey().getId(), errorMessage); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org