This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new de823db96fe Rewrite ReceivingMailbox to be able to unblock writers on 
cancellation or error (#16903)
de823db96fe is described below

commit de823db96fec884a290e54dec0ab8b2ab65c1d5a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 13 14:43:43 2025 +0200

    Rewrite ReceivingMailbox to be able to unblock writers on cancellation or 
error (#16903)
---
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  14 +-
 .../query/mailbox/InMemorySendingMailbox.java      |  49 +-
 .../apache/pinot/query/mailbox/MailboxService.java |   5 +-
 .../pinot/query/mailbox/ReceivingMailbox.java      | 602 ++++++++++++++++-----
 .../apache/pinot/query/mailbox/SendingMailbox.java |   8 +-
 .../mailbox/channel/MailboxContentObserver.java    |  66 ++-
 .../apache/pinot/query/runtime/QueryRunner.java    |  24 +-
 .../operator/BaseMailboxReceiveOperator.java       |   4 +-
 .../runtime/operator/MailboxSendOperator.java      |  31 +-
 .../query/runtime/operator/MultiStageOperator.java |   9 +
 .../runtime/operator/exchange/BlockExchange.java   |  45 +-
 .../operator/exchange/BroadcastExchange.java       |   5 +-
 .../runtime/operator/exchange/HashExchange.java    |   5 +-
 .../runtime/operator/exchange/RandomExchange.java  |   5 +-
 .../operator/exchange/SingletonExchange.java       |   5 +-
 .../utils/BlockingMultiStreamConsumer.java         |   6 +-
 .../pinot/query/mailbox/MailboxServiceTest.java    |  42 +-
 .../pinot/query/mailbox/ReceivingMailboxTest.java  | 333 ++++++++++++
 .../runtime/operator/MailboxSendOperatorTest.java  |  21 -
 .../operator/exchange/BlockExchangeTest.java       |   5 +-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  31 +-
 .../pinot/tools/ColocatedJoinQuickStart.java       |   2 +
 22 files changed, 999 insertions(+), 318 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 4ffdd58e684..50c4b1003eb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -32,7 +32,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
@@ -50,6 +49,7 @@ import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,21 +89,18 @@ public class GrpcSendingMailbox implements SendingMailbox {
   }
 
   @Override
-  public void send(MseBlock.Data data)
-      throws IOException, TimeoutException {
+  public void send(MseBlock.Data data) {
     sendInternal(data, List.of());
   }
 
   @Override
-  public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
-      throws IOException, TimeoutException {
+  public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
     sendInternal(block, serializedStats);
     LOGGER.debug("Completing mailbox: {}", _id);
     _contentObserver.onCompleted();
   }
 
-  private void sendInternal(MseBlock block, List<DataBuffer> serializedStats)
-      throws IOException {
+  private void sendInternal(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
       LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox. 
Skipping sending message {} to: {}",
           block, _id);
@@ -118,8 +115,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
     try {
       processAndSend(block, serializedStats);
     } catch (IOException e) {
-      LOGGER.warn("Failed to split and send mailbox", e);
-      throw e;
+      throw new QueryException(QueryErrorCode.INTERNAL, "Failed to split and 
send mailbox", e);
     }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index b566100f276..4b0166f5016 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
@@ -42,7 +43,13 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
   private final long _deadlineMs;
 
   private ReceivingMailbox _receivingMailbox;
+
+  /// Set to true when the send operation completes calling [#complete()]
   private volatile boolean _isTerminated;
+
+  /// Set to true when the receiver waits for EOS but discards any further 
data blocks.
+  /// This can happen when the receiver has already early terminated, for 
example,
+  /// when the [org.apache.pinot.query.runtime.operator.SortOperator] limit 
has been reached.
   private volatile boolean _isEarlyTerminated;
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
 
@@ -60,21 +67,19 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
   }
 
   @Override
-  public void send(MseBlock.Data data)
-      throws IOException, TimeoutException {
+  public void send(MseBlock.Data data) {
     sendPrivate(data, Collections.emptyList());
   }
 
   @Override
-  public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
-      throws IOException, TimeoutException {
+  public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
     sendPrivate(block, serializedStats);
     _isTerminated = true;
   }
 
-  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-      throws TimeoutException {
+  private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
+      LOGGER.debug("Mailbox {} already terminated, ignoring block {}", _id, 
block);
       return;
     }
     if (_receivingMailbox == null) {
@@ -82,22 +87,30 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
     }
     _statMap.merge(MailboxSendOperator.StatKey.IN_MEMORY_MESSAGES, 1);
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, serializedStats, timeoutMs);
-
+    ReceivingMailbox.ReceivingMailboxStatus status;
+    try {
+      status = _receivingMailbox.offer(block, serializedStats, timeoutMs);
+    } catch (InterruptedException e) {
+      // We are not restoring the interrupt status because we are already 
throwing an exception
+      // Code that catches this exception must finish the work fast enough to 
comply the interrupt contract
+      // See https://github.com/apache/pinot/pull/16903#discussion_r2409003423
+      throw new QueryException(QueryErrorCode.INTERNAL, "Interrupted while 
sending data to mailbox: " + _id, e);
+    } catch (TimeoutException e) {
+      throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT, "Timed out 
adding block into mailbox: " + _id
+          + " with timeout: " + Duration.of(timeoutMs, ChronoUnit.MILLIS), e);
+    }
     switch (status) {
       case SUCCESS:
         break;
-      case CANCELLED:
-        throw new QueryCancelledException(String.format("Mailbox: %s already 
cancelled from upstream", _id));
-      case ERROR:
-        throw new QueryException(QueryErrorCode.INTERNAL, String.format(
-            "Mailbox: %s already errored out (received error block before)", 
_id));
-      case TIMEOUT:
-        throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
-            String.format("Timed out adding block into mailbox: %s with 
timeout: %dms", _id, timeoutMs));
-      case EARLY_TERMINATED:
+      case WAITING_EOS:
         _isEarlyTerminated = true;
         break;
+      case LAST_BLOCK:
+        _isTerminated = true;
+        break;
+      case ALREADY_TERMINATED:
+        LOGGER.error("Trying to offer blocks to the already closed mailbox {}. 
This should not happen", _id);
+        break;
       default:
         throw new IllegalStateException("Unsupported mailbox status: " + 
status);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 9ed2416bfd7..3a58293367e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -55,11 +55,14 @@ public class MailboxService {
       
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS,
 TimeUnit.SECONDS)
           .removalListener((RemovalListener<String, ReceivingMailbox>) 
notification -> {
             if (notification.wasEvicted()) {
-              int numPendingBlocks = 
notification.getValue().getNumPendingBlocks();
+              ReceivingMailbox receivingMailbox = notification.getValue();
+              int numPendingBlocks = receivingMailbox.getNumPendingBlocks();
               if (numPendingBlocks > 0) {
                 LOGGER.warn("Evicting dangling receiving mailbox: {} with {} 
pending blocks", notification.getKey(),
                     numPendingBlocks);
               }
+              // In case there is a leak, we should cancel the mailbox to 
unblock any waiters and release resources.
+              receivingMailbox.cancel();
             }
           }).build();
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index a98721037c8..9bef391d21c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -22,15 +22,15 @@ import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.ThreadSafe;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
@@ -44,65 +44,58 @@ import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-/**
- * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is 
with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the send operator. 
This is because the ReceivingMailbox can be
- * initialized even before the corresponding OpChain is registered on the 
receiver, whereas the SendingMailbox is
- * initialized when the send operator is running.
- *
- * There is a single ReceivingMailbox for each {@link 
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator}.
- * The offer methods will be called when new blocks are received from 
different sources. For example local workers will
- * directly call {@link #offer(MseBlock, List, long)} while each remote worker 
opens a GPRC channel where messages
- * are sent in raw format and {@link #offerRaw(List, long)} is called from 
them.
- */
+/// Mailbox that's used to receive data. Ownership of the ReceivingMailbox is 
with the MailboxService, which is unlike
+/// the [SendingMailbox] whose ownership lies with the send operator. This is 
because the ReceivingMailbox can be
+/// initialized even before the corresponding OpChain is registered on the 
receiver, whereas the SendingMailbox is
+/// initialized when the send operator is running.
+///
+/// There is a single ReceivingMailbox for each pair of (sender, receiver) 
opchains. This means that each receive
+/// operator will have multiple ReceivingMailbox instances, one for each 
sender. They are coordinated by a
+/// [BlockingMultiStreamConsumer].
+///
+/// A ReceivingMailbox can have at most one reader and one writer at any given 
time. This means that different threads
+/// writing to the same mailbox must be externally synchronized.
+///
+/// The offer methods will be called when new blocks are received from 
different sources. For example local workers will
+/// directly call [#offer(MseBlock, List, long)] while each remote worker 
opens a GPRC channel where messages
+/// are sent in raw format and [#offerRaw(List, long)] is called from them.
 @ThreadSafe
 public class ReceivingMailbox {
   public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ReceivingMailbox.class);
-  // This was previously a static final attribute, but now that includes 
server and stage, we cannot use constants
-  private volatile MseBlockWithStats _cancelledErrorBlock;
 
   private final String _id;
   // TODO: Make the queue size configurable
-  // TODO: Revisit if this is the correct way to apply back pressure
+  // TODO: Apply backpressure at the sender side when the queue is full.
   /// The queue where blocks are going to be stored.
-  private final BlockingQueue<MseBlockWithStats> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
-  private final AtomicReference<MseBlockWithStats> _errorBlock = new 
AtomicReference<>();
-  private volatile boolean _isEarlyTerminated = false;
+  private final CancellableBlockingQueue _blocks;
   private long _lastArriveTime = System.currentTimeMillis();
 
-  @Nullable
-  private volatile Reader _reader;
   private final StatMap<StatKey> _stats = new StatMap<>(StatKey.class);
 
-  public ReceivingMailbox(String id) {
+  public ReceivingMailbox(String id, int maxPendingBlocks) {
     _id = id;
+    _blocks = new CancellableBlockingQueue(id, maxPendingBlocks);
+  }
+
+  public ReceivingMailbox(String id) {
+    this(id, DEFAULT_MAX_PENDING_BLOCKS);
   }
 
   public void registeredReader(Reader reader) {
-    if (_reader != null) {
-      throw new IllegalArgumentException("Only one reader is supported");
-    }
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("==[MAILBOX]== Reader registered for mailbox: " + _id);
-    }
-    _reader = reader;
+    _blocks.registerReader(reader);
   }
 
   public String getId() {
     return _id;
   }
 
-  /**
-   * Offers a raw block into the mailbox within the timeout specified, returns 
whether the block is successfully added.
-   * If the block is not added, an error block is added to the mailbox.
-   * <p>
-   * Contrary to {@link #offer(MseBlock, List, long)}, the block may be an 
error block.
-   */
+  ///Offers a raw block into the mailbox within the timeout specified, returns 
whether the block is successfully added.
+  ///
+  ///Contrary to [#offer(MseBlock, List, long)], the block may be an error 
block.
   public ReceivingMailboxStatus offerRaw(List<ByteBuffer> byteBuffers, long 
timeoutMs)
-      throws IOException {
+      throws IOException, InterruptedException, TimeoutException {
     updateWaitCpuTime();
 
     long startTimeMs = System.currentTimeMillis();
@@ -123,11 +116,8 @@ public class ReceivingMailbox {
       } else {
         MetadataBlock metadataBlock = (MetadataBlock) dataBlock;
         Map<QueryErrorCode, String> exceptionsByQueryError = 
QueryErrorCode.fromKeyMap(exceptions);
-        ErrorMseBlock errorBlock =
-            new ErrorMseBlock(metadataBlock.getStageId(), 
metadataBlock.getWorkerId(), metadataBlock.getServerId(),
+        block = new ErrorMseBlock(metadataBlock.getStageId(), 
metadataBlock.getWorkerId(), metadataBlock.getServerId(),
                 exceptionsByQueryError);
-        setErrorBlock(errorBlock, dataBlock.getStatsByStage());
-        return ReceivingMailboxStatus.FIRST_ERROR;
       }
     } else {
       block = new SerializedDataBlock(dataBlock);
@@ -135,62 +125,49 @@ public class ReceivingMailbox {
     return offerPrivate(block, dataBlock.getStatsByStage(), timeoutMs);
   }
 
-  public ReceivingMailboxStatus offer(MseBlock block, List<DataBuffer> 
serializedStats, long timeoutMs) {
+  /// Offers a non-error block into the mailbox within the timeout specified, 
returns whether the block is successfully
+  /// added.
+  public ReceivingMailboxStatus offer(MseBlock block, List<DataBuffer> 
serializedStats, long timeoutMs)
+      throws InterruptedException, TimeoutException {
     updateWaitCpuTime();
     _stats.merge(StatKey.IN_MEMORY_MESSAGES, 1);
-    if (block instanceof ErrorMseBlock) {
-      setErrorBlock((ErrorMseBlock) block, serializedStats);
-      return ReceivingMailboxStatus.EARLY_TERMINATED;
-    }
     return offerPrivate(block, serializedStats, timeoutMs);
   }
 
-  /**
-   * Offers a non-error block into the mailbox within the timeout specified, 
returns whether the block is successfully
-   * added. If the block is not added, an error block is added to the mailbox.
-   */
-  private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer> 
stats, long timeoutMs) {
-    MseBlockWithStats errorBlock = _errorBlock.get();
-    if (errorBlock != null) {
-      LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring 
the late block", _id);
-      return errorBlock == _cancelledErrorBlock ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
-    }
-    if (timeoutMs <= 0) {
-      LOGGER.debug("Mailbox: {} is already timed out", _id);
-      setErrorBlock(
-          ErrorMseBlock.fromException(new TimeoutException("Timed out while 
offering data to mailbox: " + _id)),
-          stats);
-      return ReceivingMailboxStatus.TIMEOUT;
-    }
+  /// Offers a non-error block into the mailbox within the timeout specified, 
returns whether the block is successfully
+  /// added.
+  private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer> 
stats, long timeoutMs)
+      throws InterruptedException, TimeoutException {
+    long start = System.currentTimeMillis();
     try {
-      long now = System.currentTimeMillis();
-      MseBlockWithStats blockWithStats = new MseBlockWithStats(block, stats);
-      boolean accepted = _blocks.offer(blockWithStats, timeoutMs, 
TimeUnit.MILLISECONDS);
-      _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() - 
now);
-      if (accepted) {
-        errorBlock = _errorBlock.get();
-        if (errorBlock == null) {
+      ReceivingMailboxStatus result;
+      if (block.isEos()) {
+        result = _blocks.offerEos((MseBlock.Eos) block, stats);
+      } else {
+        result = _blocks.offerData((MseBlock.Data) block, timeoutMs, 
TimeUnit.MILLISECONDS);
+      }
+
+      switch (result) {
+        case SUCCESS:
+        case LAST_BLOCK:
+          _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() - 
start);
           if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from 
mailbox: " + _id);
           }
-          notifyReader();
-          return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED 
: ReceivingMailboxStatus.SUCCESS;
-        } else {
-          LOGGER.debug("Mailbox: {} is already cancelled or errored out, 
ignoring the late block", _id);
-          _blocks.clear();
-          return errorBlock == _cancelledErrorBlock ? 
ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
-        }
-      } else {
-        LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", 
_id, timeoutMs);
-        TimeoutException exception = new TimeoutException(
-            "Timed out while waiting for receive operator to consume data from 
mailbox: " + _id);
-        setErrorBlock(ErrorMseBlock.fromException(exception), stats);
-        return ReceivingMailboxStatus.TIMEOUT;
+          break;
+        case WAITING_EOS:
+        case ALREADY_TERMINATED:
+        default:
+          // Nothing to do
       }
+      return result;
+    } catch (TimeoutException e) {
+      _stats.merge(StatKey.OFFER_CPU_TIME_MS, System.currentTimeMillis() - 
start);
+      throw e;
     } catch (InterruptedException e) {
-      LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
-      setErrorBlock(ErrorMseBlock.fromException(e), stats);
-      return ReceivingMailboxStatus.ERROR;
+      String errorMessage = "Interrupted on mailbox " + _id + " while offering 
blocks";
+      setErrorBlock(ErrorMseBlock.fromError(QueryErrorCode.INTERNAL, 
errorMessage), stats);
+      throw e;
     }
   }
 
@@ -200,63 +177,34 @@ public class ReceivingMailbox {
     _lastArriveTime = now;
   }
 
-  /**
-   * Sets an error block into the mailbox. No more blocks are accepted after 
calling this method.
-   */
+  /// Sets an error block into the mailbox. No more blocks are accepted after 
calling this method.
   public void setErrorBlock(ErrorMseBlock errorBlock, List<DataBuffer> 
serializedStats) {
-    if (_errorBlock.compareAndSet(null, new MseBlockWithStats(errorBlock, 
serializedStats))) {
-      _blocks.clear();
-      notifyReader();
-    }
+    _blocks.offerEos(errorBlock, serializedStats);
   }
 
-  /**
-   * Returns the first block from the mailbox, or {@code null} if there is no 
block received yet. Error block is
-   * returned if exists.
-   */
+  /// Returns the first block from the mailbox, or {@code null} if there is no 
block received yet.
   @Nullable
   public MseBlockWithStats poll() {
-    Preconditions.checkState(_reader != null, "A reader must be registered");
-    MseBlockWithStats errorBlock = _errorBlock.get();
-    return errorBlock != null ? errorBlock : _blocks.poll();
+    return _blocks.poll();
   }
 
-  /**
-   * Early terminate the mailbox, called when upstream doesn't expect any more 
data block.
-   */
+  /// Early terminate the mailbox, called when upstream doesn't expect any 
more *data* block.
   public void earlyTerminate() {
-    _isEarlyTerminated = true;
+    _blocks.earlyTerminate();
   }
 
-  /**
-   * Cancels the mailbox. No more blocks are accepted after calling this 
method. Should only be called by the receive
-   * operator to clean up the remaining blocks.
-   */
+  /// Cancels the mailbox. No more blocks are accepted after calling this 
method and [#poll] will always return
+  /// an error block.
   public void cancel() {
     LOGGER.debug("Cancelling mailbox: {}", _id);
-    if (_errorBlock.get() == null) {
-      MseBlockWithStats errorBlock = new MseBlockWithStats(
-          ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled 
by receiver"),
-          Collections.emptyList());
-      if (_errorBlock.compareAndSet(null, errorBlock)) {
-        _cancelledErrorBlock = errorBlock;
-        _blocks.clear();
-      }
-    }
+    _blocks.offerEos(ErrorMseBlock.fromException(null), List.of());
   }
 
+  /// Returns the number of pending **data** blocks in the mailbox.
+  ///
+  /// Remember that the EOS block is not counted here.
   public int getNumPendingBlocks() {
-    return _blocks.size();
-  }
-
-  private void notifyReader() {
-    Reader reader = _reader;
-    if (reader != null) {
-      LOGGER.debug("Notifying reader");
-      reader.blockReadyToRead();
-    } else {
-      LOGGER.debug("No reader to notify");
-    }
+    return _blocks.exactSize();
   }
 
   public StatMap<StatKey> getStatMap() {
@@ -268,7 +216,26 @@ public class ReceivingMailbox {
   }
 
   public enum ReceivingMailboxStatus {
-    SUCCESS, FIRST_ERROR, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
+    /// The block was successfully added to the mailbox.
+    ///
+    /// More blocks can be sent.
+    SUCCESS,
+    /// The block is rejected because downstream has early terminated and now 
is only waiting for EOS in order to
+    /// get the stats.
+    ///
+    /// More blocks can be sent, but data blocks will be rejected.
+    WAITING_EOS,
+    /// The received message is the last block the mailbox will ever read.
+    ///
+    /// This happens for example when an EOS block is added to the mailbox.
+    ///
+    /// No more blocks can be sent.
+    LAST_BLOCK,
+    /// The mailbox has been closed for write. There may still be pending 
blocks to read, but no more blocks
+    /// can be added.
+    ///
+    /// No more blocks can be sent.
+    ALREADY_TERMINATED
   }
 
   public enum StatKey implements StatMap.Key {
@@ -315,4 +282,377 @@ public class ReceivingMailbox {
       return _serializedStats;
     }
   }
+
+  /// The state of the queue.
+  ///
+  /// ```
+  /// +-------------------+   offerEos    +-------------------+
+  /// |    FULL_OPEN      | ----------->  |  UPSTREAM_FINISHED|
+  /// +-------------------+               +-------------------+
+  ///       |                                 |
+  ///       | earlyTerminate                  | poll -- when all pending data 
is read
+  ///       v                                 v
+  /// +-------------------+   offerEos   +-------------------+
+  /// |   WAITING_EOS     | -----------> |   FULL_CLOSED     |
+  /// +-------------------+              +-------------------+
+  /// ```
+  private enum State {
+    /// The queue is open for both read and write.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue, or null if the 
queue is empty.
+    /// - [#offer] accepts both data and EOS blocks.
+    ///
+    /// Transitions to [State#UPSTREAM_FINISHED] when an EOS block is offered 
or to [State#WAITING_EOS] when
+    /// [#earlyTerminate()] is called.
+    FULL_OPEN,
+    /// The downstream is not interested in reading more data but is waiting 
for an EOS block to get the stats.
+    ///
+    /// - [#poll()] returns null.
+    /// - [#offer] rejects all data blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when an EOS block is offered.
+    WAITING_EOS,
+    /// The upstream has indicated that no more data will be sent.
+    ///
+    /// - [#poll()] returns the pending blocks in the queue and then the EOS 
block.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// Transitions to [State#FULL_CLOSED] when the EOS block is read by 
[#poll()].
+    UPSTREAM_FINISHED,
+    /// The queue is closed for both read and write.
+    ///
+    /// - [#poll()] always returns the EOS block, which is always not null.
+    /// - [#offer] rejects all blocks.
+    ///
+    /// No transitions out of this state.
+    FULL_CLOSED
+  }
+
+  /// This is a special bounded blocking queue implementation similar to 
ArrayBlockingQueue, but:
+  /// - Only accepts a single reader (aka downstream).
+  /// - Only accepts a multiple concurrent writers (aka upstream)
+  /// - Can be [closed for write][#closeForWrite(MseBlock.Eos, List)].
+  /// - Can be [#earlyTerminate()]d.
+  ///
+  /// Read the [State] enum to understand the different states and their 
transitions.
+  ///
+  /// All methods of this class are thread-safe and may block, although only 
[#offer] should block for a long time.
+  @ThreadSafe
+  private static class CancellableBlockingQueue {
+    private final String _id;
+    @Nullable
+    private volatile Reader _reader;
+    /// This is set when the queue is in [State#FULL_CLOSED] or 
[State#UPSTREAM_FINISHED].
+    @Nullable
+    @GuardedBy("_lock")
+    private MseBlockWithStats _eos;
+    /// The current state of the queue.
+    ///
+    /// All changes to this field must be done by calling [#changeState(State, 
String)] in order to log the state
+    /// transitions.
+    @GuardedBy("_lock")
+    private State _state = State.FULL_OPEN;
+    /// The items in the queue.
+    ///
+    /// This is a circular array where [#_putIndex] is the index to add the 
next item and [#_takeIndex] is the index to
+    /// take the next item from. Only data blocks are stored in this array, 
the EOS block is stored in [#_eos].
+    ///
+    /// Like in normal blocking queues, elements are added when upstream 
threads call [#offer] and removed when the
+    /// downstream thread calls [#poll]. Unlike normal blocking queues, 
elements will be [removed][#drainDataBlocks()]
+    /// when transitioning to [State#WAITING_EOS] or [State#FULL_CLOSED].
+    @GuardedBy("_lock")
+    private final MseBlock.Data[] _dataBlocks;
+    @GuardedBy("_lock")
+    private int _takeIndex;
+    @GuardedBy("_lock")
+    private int _putIndex;
+    @GuardedBy("_lock")
+    private int _count;
+    /// Threads waiting to add more data to the queue.
+    ///
+    /// This is used to prevent the following situation:
+    /// 1. The queue is full.
+    /// 2. Thread A tries to add data. Thread A will be blocked waiting for 
space in the queue.
+    /// 3. Thread B adds an EOS block, which will transition the queue to 
[State#UPSTREAM_FINISHED].
+    /// 4. Thread C reads data from the queue in a loop, the scheduler doesn't 
give time to Thread A.
+    /// 5. Thread C consumes all data from the queue and then reads the EOS 
block.
+    /// 6. Finally Thread A is unblocked and adds data to the queue, even 
though the queue is already closed for write
+    ///
+    /// As a result the block from A will be lost. Instead, we use this 
counter to return null in [#poll] when the
+    /// queue is empty but there are still threads trying to add data to the 
queue.
+    @GuardedBy("_lock")
+    private int _pendingData;
+    private final ReentrantLock _lock = new ReentrantLock();
+    private final Condition _notFull = _lock.newCondition();
+
+    public CancellableBlockingQueue(String id, int capacity) {
+      _id = id;
+      _dataBlocks = new MseBlock.Data[capacity];
+    }
+
+    /// Notifies the downstream that there is data to read.
+    private void notifyReader() {
+      Reader reader = _reader;
+      if (reader != null) {
+        LOGGER.debug("Notifying reader");
+        reader.blockReadyToRead();
+      } else {
+        LOGGER.debug("No reader to notify");
+      }
+    }
+
+    /// Offers a successful or erroneous EOS block into the queue, returning 
the status of the operation.
+    ///
+    /// This method never blocks for long, as it doesn't need to wait for 
space in the queue.
+    public ReceivingMailboxStatus offerEos(MseBlock.Eos block, 
List<DataBuffer> stats) {
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        switch (_state) {
+          case FULL_CLOSED:
+          case UPSTREAM_FINISHED:
+            // The queue is closed for write. Always reject the block.
+            LOGGER.debug("Mailbox: {} is already closed for write, ignoring 
the late {} block", _id, block);
+            return ReceivingMailboxStatus.ALREADY_TERMINATED;
+          case WAITING_EOS:
+            // We got the EOS block we expected. Close the queue for both read 
and write.
+            changeState(State.FULL_CLOSED, "received EOS block");
+            _eos = new MseBlockWithStats(block, stats);
+            notifyReader();
+            return ReceivingMailboxStatus.LAST_BLOCK;
+          case FULL_OPEN:
+            changeState(State.UPSTREAM_FINISHED, "received EOS block");
+            _eos = new MseBlockWithStats(block, stats);
+            notifyReader();
+            if (block.isError()) {
+              drainDataBlocks();
+            }
+            return ReceivingMailboxStatus.LAST_BLOCK;
+          default:
+            throw new IllegalStateException("Unexpected state: " + _state);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /// Offers a data block into the queue within the timeout specified, 
returning the status of the operation.
+    public ReceivingMailboxStatus offerData(MseBlock.Data block, long timeout, 
TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+      ReentrantLock lock = _lock;
+      lock.lockInterruptibly();
+      try {
+        while (true) {
+          switch (_state) {
+            case FULL_CLOSED:
+            case UPSTREAM_FINISHED:
+              // The queue is closed for write. Always reject the block.
+              LOGGER.debug("Mailbox: {} is already closed for write, ignoring 
the late data block", _id);
+              return ReceivingMailboxStatus.ALREADY_TERMINATED;
+            case WAITING_EOS:
+              // The downstream is not interested in reading more data.
+              LOGGER.debug("Mailbox: {} is not interesting in late data 
block", _id);
+              return ReceivingMailboxStatus.WAITING_EOS;
+            case FULL_OPEN:
+              if (offerDataToBuffer(block, timeout, timeUnit)) {
+                notifyReader();
+                return ReceivingMailboxStatus.SUCCESS;
+              }
+              // otherwise transitioned to FULL_CLOSED or WAITING_EOS while 
waiting for space in the queue
+              // and we need to re-evaluate the state
+              break;
+            default:
+              throw new IllegalStateException("Unexpected state: " + _state);
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /// Offers a data block into the queue within the timeout specified, 
returning true if the block was added
+    /// successfully.
+    ///
+    /// This method can only be called while the queue is in the FULL_OPEN 
state and the lock is held.
+    ///
+    /// This method can time out, in which case we automatically transition to 
the [State#FULL_CLOSED] state.
+    /// But instead of returning false, we throw a [TimeoutException]. This is 
because the caller may want to
+    /// distinguish between a timeout and other reasons for not being able to 
add the block to the queue in order to
+    /// report different error messages.
+    ///
+    /// @return true if the block was added successfully, false if the state 
changed while waiting.
+    /// @throws InterruptedException if the thread is interrupted while 
waiting for space in the queue.
+    /// @throws TimeoutException if the timeout specified elapsed before space 
was available in the queue.
+    @GuardedBy("_lock")
+    private boolean offerDataToBuffer(MseBlock.Data block, long timeout, 
TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+
+      assert _state == State.FULL_OPEN;
+
+      long nanos = timeUnit.toNanos(timeout);
+      MseBlock.Data[] items = _dataBlocks;
+      _pendingData++;
+      try {
+        while (_count == items.length && nanos > 0L) {
+          nanos = _notFull.awaitNanos(nanos);
+
+          switch (_state) {
+            case FULL_OPEN: // we are in the same state, continue waiting for 
space
+              break;
+            case FULL_CLOSED:
+            case WAITING_EOS:
+              // The queue is closed and the reader is not interested in 
reading more data.
+              return false;
+            case UPSTREAM_FINISHED:
+              // Another thread offered the EOS while we were waiting for 
space.
+              assert _eos != null;
+              if (_eos._block.isSuccess()) { // If closed with EOS, the reader 
is still interested in reading our block
+                continue;
+              }
+              // if closed with an error, the reader is not interested in 
reading our block
+              return false;
+            default:
+              throw new IllegalStateException("Unexpected state: " + _state);
+          }
+        }
+        if (nanos <= 0L) { // timed out
+          String errorMessage = "Timed out while waiting for receive operator 
to consume data from mailbox: " + _id;
+          ErrorMseBlock timeoutBlock = 
ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, errorMessage);
+          changeState(State.FULL_CLOSED, "timed out while waiting to offer 
data block");
+          drainDataBlocks();
+          _eos = new MseBlockWithStats(timeoutBlock, List.of());
+          notifyReader();
+          throw new TimeoutException(errorMessage);
+        }
+        items[_putIndex] = block;
+        if (++_putIndex == items.length) {
+          _putIndex = 0;
+        }
+        _count++;
+        return true;
+      } finally {
+        _pendingData--;
+      }
+    }
+
+    /// Returns the first block from the queue, or `null` if there is no block 
in the queue. The returned block will be
+    /// an error block if the queue has been cancelled or has encountered an 
error.
+    ///
+    /// This method may block briefly while acquiring the lock, but it doesn't 
actually require waiting for data in the
+    /// queue.
+    @Nullable
+    public MseBlockWithStats poll() {
+      Preconditions.checkState(_reader != null, "A reader must be registered");
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        switch (_state) {
+          case FULL_CLOSED:
+            // The queue is closed for both read and write. Always return the 
error block.
+            assert _eos != null;
+            return _eos;
+          case WAITING_EOS:
+            // The downstream is not interested in reading more data but is 
waiting for an EOS block to get the stats.
+            // Polls returns null and only EOS blocks are accepted by offer.
+            assert _eos == null;
+            return null;
+          case UPSTREAM_FINISHED:
+            // The upstream has indicated that no more data will be sent. Poll 
returns pending blocks and then the EOS
+            // block.
+            if (_count == 0) {
+              if (_pendingData > 0) {
+                // There are still threads trying to add data to the queue. We 
should wait for them to finish.
+                LOGGER.debug("Mailbox: {} has pending {} data blocks, waiting 
for them to finish", _id, _pendingData);
+                return null;
+              } else {
+                changeState(State.FULL_CLOSED, "read all data blocks");
+                return _eos;
+              }
+            }
+            break;
+          case FULL_OPEN:
+            if (_count == 0) {
+              assert _eos == null;
+              return null;
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unexpected state: " + _state);
+        }
+        assert _count > 0 : "if we reach here, there must be data in the 
queue";
+        MseBlock.Data[] items = _dataBlocks;
+        MseBlock.Data block = items[_takeIndex];
+        assert block != null : "data block in the queue must not be null";
+        items[_takeIndex] = null;
+        if (++_takeIndex == items.length) {
+          _takeIndex = 0;
+        }
+        _count--;
+        _notFull.signal();
+
+        return new MseBlockWithStats(block, List.of());
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @GuardedBy("_lock")
+    private void changeState(State newState, String desc) {
+      LOGGER.debug("Mailbox: {} {}, transitioning from {} to {}", _id, desc, 
_state, newState);
+      _state = newState;
+    }
+
+    @GuardedBy("_lock")
+    private void drainDataBlocks() {
+      Arrays.fill(_dataBlocks, null);
+      _notFull.signalAll();
+      _count = 0;
+    }
+
+    public int exactSize() {
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        return _count;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /// Called by the downstream to indicate that no more data blocks will be 
read.
+    public void earlyTerminate() {
+      ReentrantLock lock = _lock;
+      lock.lock();
+      try {
+        switch (_state) {
+          case FULL_CLOSED:
+          case WAITING_EOS:
+            LOGGER.debug("Mailbox: {} is already closed for read", _id);
+            return;
+          case UPSTREAM_FINISHED:
+            drainDataBlocks();
+            changeState(State.FULL_CLOSED, "early terminated");
+            break;
+          case FULL_OPEN:
+            drainDataBlocks();
+            changeState(State.WAITING_EOS, "early terminated");
+            break;
+          default:
+            throw new IllegalStateException("Unexpected state: " + _state);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    public void registerReader(Reader reader) {
+      if (_reader != null) {
+        throw new IllegalArgumentException("Only one reader is supported");
+      }
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("==[MAILBOX]== Reader registered for mailbox: " + _id);
+      }
+      _reader = reader;
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index d11329f112b..65a0d69831c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
@@ -54,16 +52,14 @@ public interface SendingMailbox {
    * and they should <b>not</b> acquire any resources when they are created. 
This method should throw if there was an
    * error sending the data, since that would allow {@link BlockExchange} to 
exit early.
    */
-  void send(MseBlock.Data data)
-      throws IOException, TimeoutException;
+  void send(MseBlock.Data data);
 
   /**
    * Sends an EOS block to the receiver. Note that SendingMailbox are required 
to acquire resources lazily in this call,
    * and they should <b>not</b> acquire any resources when they are created. 
This method should throw if there was an
    * error sending the data, since that would allow {@link BlockExchange} to 
exit early.
    */
-  void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
-      throws IOException, TimeoutException;
+  void send(MseBlock.Eos block, List<DataBuffer> serializedStats);
 
   /**
    * Cancels the mailbox and notifies the receiver of the cancellation so that 
it can release the underlying resources.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index cae518fb194..d8a2f04d0c0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
@@ -49,6 +50,7 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
   private final MailboxService _mailboxService;
   private final StreamObserver<MailboxStatus> _responseObserver;
   private final List<ByteBuffer> _mailboxBuffers = 
Collections.synchronizedList(new ArrayList<>());
+  private boolean _closedStream = false;
 
   private volatile ReceivingMailbox _mailbox;
 
@@ -61,6 +63,10 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
 
   @Override
   public void onNext(MailboxContent mailboxContent) {
+    if (_closedStream) {
+      LOGGER.debug("Received a late message once the stream was closed. 
Ignoring it.");
+      return;
+    }
     String mailboxId = mailboxContent.getMailboxId();
     if (_mailbox == null) {
       _mailbox = _mailboxService.getReceivingMailbox(mailboxId);
@@ -71,50 +77,60 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
     }
     try {
       long timeoutMs = 
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
-      ReceivingMailbox.ReceivingMailboxStatus status = 
_mailbox.offerRaw(_mailboxBuffers, timeoutMs);
-      _mailboxBuffers.clear();
+      ReceivingMailbox.ReceivingMailboxStatus status = null;
+      try {
+        status = _mailbox.offerRaw(_mailboxBuffers, timeoutMs);
+      } catch (TimeoutException e) {
+        LOGGER.debug("Timed out adding block into mailbox: {} with timeout: 
{}ms", mailboxId, timeoutMs);
+        closeStream();
+        return;
+      } catch (InterruptedException e) {
+        // We are not restoring the interrupt status because we are already 
throwing an exception
+        // Code that catches this exception must finish the work fast enough 
to comply the interrupt contract
+        // See 
https://github.com/apache/pinot/pull/16903#discussion_r2409003423
+        LOGGER.debug("Interrupted while processing blocks for mailbox: {}", 
mailboxId, e);
+        closeStream();
+        return;
+      }
       switch (status) {
         case SUCCESS:
           
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
               .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
                   Integer.toString(_mailbox.getNumPendingBlocks())).build());
           break;
-        case CANCELLED:
-          LOGGER.warn("Mailbox: {} already cancelled from upstream", 
mailboxId);
-          cancelStream();
-          break;
-        case FIRST_ERROR:
-          return;
-        case ERROR:
-          LOGGER.warn("Mailbox: {} already errored out (received error block 
before)", mailboxId);
-          cancelStream();
-          break;
-        case TIMEOUT:
-          LOGGER.warn("Timed out adding block into mailbox: {} with timeout: 
{}ms", mailboxId, timeoutMs);
-          cancelStream();
-          break;
-        case EARLY_TERMINATED:
-          LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
+        case WAITING_EOS:
+          // The receiving mailbox is early terminated, inform the sender to 
stop sending more data. Only EOS block is
+          // expected to be sent afterward.
           
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
               
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, 
"true").build());
           break;
+        case LAST_BLOCK:
+          LOGGER.debug("Mailbox: {} has received the last block, closing the 
stream", mailboxId);
+          closeStream();
+          break;
+        case ALREADY_TERMINATED:
+          LOGGER.error("Trying to offer blocks to the already closed mailbox 
{}. This should not happen", mailboxId);
+          closeStream();
+          break;
         default:
           throw new IllegalStateException("Unsupported mailbox status: " + 
status);
       }
     } catch (Exception e) {
-      _mailboxBuffers.clear();
       String errorMessage = "Caught exception while processing blocks for 
mailbox: " + mailboxId;
       LOGGER.error(errorMessage, e);
+      closeStream();
       _mailbox.setErrorBlock(
           ErrorMseBlock.fromException(new RuntimeException(errorMessage, e)), 
Collections.emptyList());
-      cancelStream();
+    } finally {
+      _mailboxBuffers.clear();
     }
   }
 
-  private void cancelStream() {
+  private void closeStream() {
     try {
       // NOTE: DO NOT use onError() because it will terminate the stream, and 
sender might not get the callback
       _responseObserver.onCompleted();
+      _closedStream = true;
     } catch (Exception e) {
       // Exception can be thrown if the stream is already closed, so we simply 
ignore it
       LOGGER.debug("Caught exception cancelling mailbox: {}", _mailbox != null 
? _mailbox.getId() : "unknown", e);
@@ -132,11 +148,19 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
     } else {
       LOGGER.error("Got error before mailbox is set up", t);
     }
+    if (!_closedStream) {
+      _closedStream = true;
+      _responseObserver.onError(t);
+    }
   }
 
   @Override
   public void onCompleted() {
     _mailboxBuffers.clear();
+    if (_closedStream) {
+      return;
+    }
+    _closedStream = true;
     try {
       _responseObserver.onCompleted();
     } catch (Exception e) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 875e46bc6e8..72da1a20aba 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
@@ -74,6 +73,8 @@ import 
org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
 import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.ExecutorServiceUtils;
 import org.apache.pinot.spi.executor.HardLimitExecutor;
 import org.apache.pinot.spi.executor.MetricsExecutor;
@@ -327,21 +328,32 @@ public class QueryRunner {
     }
     long deadlineMs = executionContext.getPassiveDeadlineMs();
     for (RoutingInfo routingInfo : routingInfos) {
+      String mailboxId = routingInfo.getMailboxId();
       try {
         StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
         SendingMailbox sendingMailbox =
             _mailboxService.getSendingMailbox(routingInfo.getHostname(), 
routingInfo.getPort(),
-                routingInfo.getMailboxId(), deadlineMs, statMap);
+                mailboxId, deadlineMs, statMap);
         // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
         //  current stage. We will need to fix this in future, but for now, we 
are sending the error block without
         //  the stats.
         sendingMailbox.send(errorBlock, Collections.emptyList());
-      } catch (TimeoutException e) {
-        LOGGER.warn("Timed out sending error block to mailbox: {} for request: 
{}, stage: {}",
-            routingInfo.getMailboxId(), requestId, stageId, e);
+      } catch (QueryException e) {
+        QueryErrorCode errorCode = e.getErrorCode();
+        switch (errorCode) {
+          case EXECUTION_TIMEOUT:
+            LOGGER.warn("Timed out sending error block to mailbox: {}", 
mailboxId, e);
+            break;
+          case QUERY_CANCELLATION:
+            LOGGER.info("Query cancelled while offering blocks to mailbox: 
{}", mailboxId);
+            break;
+          default:
+            LOGGER.error("{} exception while exception sending error block to 
mailbox: {}", errorCode, mailboxId, e);
+            break;
+        }
       } catch (Exception e) {
         LOGGER.error("Caught exception sending error block to mailbox: {} for 
request: {}, stage: {}",
-            routingInfo.getMailboxId(), requestId, stageId, e);
+            mailboxId, requestId, stageId, e);
       }
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 753ec9744b6..250395042a0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -77,12 +77,12 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
         asyncStreams.add(asyncStream);
         _receivingStats.add(asyncStream._mailbox.getStatMap());
       }
-      _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context, 
asyncStreams);
+      _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context, 
asyncStreams, senderStageId);
     } else {
       // TODO: Revisit if we should throw exception here.
       _mailboxIds = List.of();
       _receivingStats = List.of();
-      _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context, 
List.of());
+      _multiConsumer = new BlockingMultiStreamConsumer.OfMseBlock(context, 
List.of(), senderStageId);
     }
     _statMap.merge(StatKey.FAN_IN, _mailboxIds.size());
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 0de27d40f7e..fe24b253f55 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
@@ -45,6 +44,7 @@ import 
org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
 import org.apache.pinot.spi.exception.QueryCancelledException;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.exception.TerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -213,9 +213,7 @@ public class MailboxSendOperator extends MultiStageOperator 
{
       if (block.isEos()) {
         sendEos((MseBlock.Eos) block);
       } else {
-        if (sendMseBlock(((MseBlock.Data) block))) {
-          earlyTerminate();
-        }
+        sendMseBlock(((MseBlock.Data) block));
       }
       checkTerminationAndSampleUsage();
       return block;
@@ -225,15 +223,14 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     } catch (TerminationException e) {
       LOGGER.info("Query was terminated for opChain: {}", _context.getId(), e);
       return ErrorMseBlock.fromException(e);
-    } catch (TimeoutException e) {
-      LOGGER.warn("Timed out transferring data on opChain: {}", 
_context.getId(), e);
+    } catch (QueryException e) {
       return ErrorMseBlock.fromException(e);
-    } catch (Exception e) {
+    } catch (RuntimeException e) {
       ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
       try {
         LOGGER.error("Exception while transferring data on opChain: {}", 
_context.getId(), e);
         sendEos(errorBlock);
-      } catch (Exception e2) {
+      } catch (RuntimeException e2) {
         LOGGER.error("Exception while sending error block.", e2);
       }
       return errorBlock;
@@ -246,9 +243,7 @@ public class MailboxSendOperator extends MultiStageOperator 
{
     return _context.getPassiveDeadlineMs();
   }
 
-  private void sendEos(MseBlock.Eos eosBlockWithoutStats)
-      throws Exception {
-
+  private void sendEos(MseBlock.Eos eosBlockWithoutStats) {
     MultiStageQueryStats stats = null;
     List<DataBuffer> serializedStats;
     if (_context.isSendStats()) {
@@ -276,22 +271,20 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     return new StatMap<>(_statMap);
   }
 
-  private boolean sendMseBlock(MseBlock.Data block)
-      throws Exception {
-    boolean isEarlyTerminated = _exchange.send(block);
+  private void sendMseBlock(MseBlock.Data block) {
+    if (_exchange.send(block)) {
+      earlyTerminate();
+    }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[SEND]== Block " + block + " sent from: " + 
_context.getId());
     }
-    return isEarlyTerminated;
   }
 
-  private boolean sendMseBlock(MseBlock.Eos block, List<DataBuffer> 
serializedStats)
-      throws Exception {
-    boolean isEarlyTerminated = _exchange.send(block, serializedStats);
+  private void sendMseBlock(MseBlock.Eos block, List<DataBuffer> 
serializedStats) {
+    _exchange.send(block, serializedStats);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[SEND]== Block " + block + " sent from: " + 
_context.getId());
     }
-    return isEarlyTerminated;
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 7b0290a69ec..ced7906c564 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.plan.ExplainInfo;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.operator.set.SetOperator;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -132,6 +133,14 @@ public abstract class MultiStageOperator implements 
Operator<MseBlock>, AutoClos
   protected abstract MseBlock getNextBlock()
       throws Exception;
 
+  /**
+   * Signals the operator to terminate early.
+   *
+   * After this method is called, the operator should stop processing any more 
input and return a
+   * {@link SuccessMseBlock} block as soon as possible.
+   * This method should be called when the consumer of the operator does not 
need any more data and wants to stop the
+   * execution early to save resources.
+   */
   protected void earlyTerminate() {
     _isEarlyTerminated = true;
     for (MultiStageOperator child : getChildOperators()) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 261c032e965..c99c469a90a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -18,13 +18,9 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
@@ -100,11 +96,9 @@ public abstract class BlockExchange {
    * API to send a block to the destination mailboxes.
    * @param block the block to be transferred
    * @return true if all the mailboxes has been early terminated.
-   * @throws IOException when sending stream unexpectedly closed.
-   * @throws TimeoutException when sending stream timeout.
+   * @throws org.apache.pinot.spi.exception.QueryException if any mailbox 
fails to send the block, including on timeout.
    */
-  public boolean send(MseBlock.Data block)
-      throws IOException, TimeoutException {
+  public boolean send(MseBlock.Data block) {
     boolean isEarlyTerminated = true;
     for (SendingMailbox sendingMailbox : _sendingMailboxes) {
       if (!sendingMailbox.isEarlyTerminated()) {
@@ -122,11 +116,9 @@ public abstract class BlockExchange {
    * API to send a block to the destination mailboxes.
    * @param eosBlock the block to be transferred
    * @return true if all the mailboxes has been early terminated.
-   * @throws IOException when sending stream unexpectedly closed.
-   * @throws TimeoutException when sending stream timeout.
+   * @throws org.apache.pinot.spi.exception.QueryException if any mailbox 
fails to send the block, including on timeout.
    */
-  public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
-      throws IOException, TimeoutException {
+  public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats) 
{
     int mailboxIdToSendMetadata;
     if (!serializedStats.isEmpty()) {
       mailboxIdToSendMetadata = _statsIndexChooser.apply(_sendingMailboxes);
@@ -138,18 +130,18 @@ public abstract class BlockExchange {
       // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
       mailboxIdToSendMetadata = -1;
     }
-    Exception firstException = null;
+    RuntimeException firstException = null;
     int numMailboxes = _sendingMailboxes.size();
     for (int i = 0; i < numMailboxes; i++) {
       try {
         SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
-        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
+        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : List.of();
 
         sendingMailbox.send(eosBlock, statsToSend);
         if (LOGGER.isTraceEnabled()) {
           LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
         }
-      } catch (IOException | TimeoutException | RuntimeException e) {
+      } catch (RuntimeException e) {
         // We want to try to send EOS to all mailboxes, so we catch the 
exception and rethrow it at the end.
         if (firstException == null) {
           firstException = e;
@@ -159,22 +151,12 @@ public abstract class BlockExchange {
       }
     }
     if (firstException != null) {
-      // This is ugly, but necessary to be sure we throw the right exception, 
which is later caught by the
-      // QueryRunner and handled properly.
-      if (firstException instanceof IOException) {
-        throw (IOException) firstException;
-      } else if (firstException instanceof TimeoutException) {
-        throw (TimeoutException) firstException;
-      } else {
-        Preconditions.checkState(firstException instanceof RuntimeException);
-        throw (RuntimeException) firstException;
-      }
+      throw firstException;
     }
     return false;
   }
 
-  protected void sendBlock(SendingMailbox sendingMailbox, MseBlock.Data block)
-      throws IOException, TimeoutException {
+  protected void sendBlock(SendingMailbox sendingMailbox, MseBlock.Data block) 
{
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("Sending block: {} {} to {}", block, 
System.identityHashCode(block), sendingMailbox);
     }
@@ -192,8 +174,7 @@ public abstract class BlockExchange {
     }
   }
 
-  protected abstract void route(List<SendingMailbox> destinations, 
MseBlock.Data block)
-      throws IOException, TimeoutException;
+  protected abstract void route(List<SendingMailbox> destinations, 
MseBlock.Data block);
 
   // Called when the OpChain gracefully returns.
   // TODO: This is a no-op right now.
@@ -237,8 +218,7 @@ public abstract class BlockExchange {
     }
 
     @Override
-    public void send(MseBlock.Data data)
-        throws IOException, TimeoutException {
+    public void send(MseBlock.Data data) {
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("Exchange mailbox {} echoing data block {} {}", this, 
data, System.identityHashCode(data));
       }
@@ -246,8 +226,7 @@ public abstract class BlockExchange {
     }
 
     @Override
-    public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
-        throws IOException, TimeoutException {
+    public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("Exchange mailbox {} echoing EOS block {} {}", this, 
block, System.identityHashCode(block));
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index 941b4d05486..57f0ba415c1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
@@ -42,8 +40,7 @@ class BroadcastExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
-      throws IOException, TimeoutException {
+  protected void route(List<SendingMailbox> destinations, MseBlock.Data block) 
{
     for (SendingMailbox mailbox : destinations) {
       sendBlock(mailbox, block);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 21ea3729ee2..14a6a6ac940 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -19,10 +19,8 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -54,8 +52,7 @@ class HashExchange extends BlockExchange {
 
   @SuppressWarnings({"rawtypes", "unchecked"})
   @Override
-  protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
-      throws IOException, TimeoutException {
+  protected void route(List<SendingMailbox> destinations, MseBlock.Data block) 
{
     int numMailboxes = destinations.size();
     if (numMailboxes == 1 || _keySelector == EmptyKeySelector.INSTANCE) {
       sendBlock(destinations.get(0), block);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index 6ad534c8a4c..1632018d60b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -19,10 +19,8 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.IntFunction;
 import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -56,8 +54,7 @@ class RandomExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox> destinations, MseBlock.Data block)
-      throws IOException, TimeoutException {
+  protected void route(List<SendingMailbox> destinations, MseBlock.Data block) 
{
     int destinationIdx = _rand.apply(destinations.size());
     sendBlock(destinations.get(destinationIdx), block);
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index f6fd85d9545..32d2df68a58 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.base.Preconditions;
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
@@ -45,8 +43,7 @@ class SingletonExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox> sendingMailboxes, MseBlock.Data 
block)
-      throws IOException, TimeoutException {
+  protected void route(List<SendingMailbox> sendingMailboxes, MseBlock.Data 
block) {
     sendBlock(sendingMailboxes.get(0), block);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index e04e6000aeb..ca6e3dcc8ff 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -344,12 +344,14 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
     private final int _stageId;
     @Nullable
     private MultiStageQueryStats _stats;
+    private final int _senderStageId;
 
     public OfMseBlock(OpChainExecutionContext context,
-        List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>> 
asyncProducers) {
+        List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>> 
asyncProducers, int senderStageId) {
       super(context.getId(), context.getPassiveDeadlineMs(), asyncProducers);
       _stageId = context.getStageId();
       _stats = MultiStageQueryStats.emptyStats(context.getStageId());
+      _senderStageId = senderStageId;
     }
 
     @Override
@@ -393,7 +395,7 @@ public abstract class BlockingMultiStreamConsumer<E> 
implements AutoCloseable {
         return onException(terminateException.getErrorCode(), 
terminateException.getMessage());
       }
       // TODO: Add the sender stage id to the error message
-      String errMsg = "Timed out on stage " + _stageId + " waiting for data 
sent by a child stage";
+      String errMsg = "Timed out on stage " + _stageId + " waiting for data 
from child stage " + _senderStageId;
       // We log this case as debug because:
       // - The opchain will already log a stackless message once the opchain 
fail
       // - The trace is not useful (the log message is good enough to find 
where we failed)
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index f5f1544e31e..78f21ac0569 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -97,17 +97,21 @@ public class MailboxServiceTest {
     receivingMailbox.registeredReader(() -> {
     });
     for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
-      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
       List<Object[]> rows = getRows(receivingMailbox);
       assertEquals(rows.size(), 1);
       assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
     }
-    assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+    assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
     ReceivingMailbox.MseBlockWithStats block = receivingMailbox.poll();
     assertNotNull(block);
     assertTrue(block.getBlock().isSuccess());
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
-    assertNull(receivingMailbox.poll());
+
+    // Following calls to poll() should return the same eos block
+    block = receivingMailbox.poll();
+    assertNotNull(block);
+    assertTrue(block.getBlock().isSuccess());
   }
 
   @Test
@@ -132,17 +136,16 @@ public class MailboxServiceTest {
     assertEquals(numCallbacks.get(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
 
     for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
-      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
       List<Object[]> rows = getRows(receivingMailbox);
       assertEquals(rows.size(), 1);
       assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
     }
-    assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+    assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
     MseBlock block = readBlock(receivingMailbox);
     assertNotNull(block);
     assertTrue(block.isEos());
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
-    assertNull(receivingMailbox.poll());
   }
 
   @Test
@@ -213,7 +216,7 @@ public class MailboxServiceTest {
     // Send one data block and then cancel
     sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{"0"}));
     receivingMailbox.cancel();
-    assertEquals(numCallbacks.get(), 1);
+    assertEquals(numCallbacks.get(), 2);
 
     // Data blocks will be cleaned up
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
@@ -224,7 +227,7 @@ public class MailboxServiceTest {
     // Cancel is idempotent for both sending and receiving mailbox, so safe to 
call multiple times
     sendingMailbox.cancel(new Exception("TEST ERROR"));
     receivingMailbox.cancel();
-    assertEquals(numCallbacks.get(), 1);
+    assertEquals(numCallbacks.get(), 2);
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
   }
 
@@ -281,7 +284,8 @@ public class MailboxServiceTest {
 
     // Next send will throw exception because buffer is full
     try {
-      sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
+      // The block sent must be a data block, as error and eos blocks are 
always accepted
+      sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(1)}));
       fail("Except exception when sending data after buffer is full");
     } catch (Exception e) {
       // Expected
@@ -336,7 +340,7 @@ public class MailboxServiceTest {
     // Sends are non-blocking as long as channel capacity is not breached
     SendingMailbox sendingMailbox =
         _mailboxService2.getSendingMailbox("localhost", 
_mailboxService1.getPort(), mailboxId, Long.MAX_VALUE, _stats);
-    for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
+    for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS; i++) {
       sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));
     }
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
@@ -349,18 +353,16 @@ public class MailboxServiceTest {
         aVoid -> receivingMailbox.getNumPendingBlocks() == 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS, 1000L,
         "Failed to deliver mails");
 
-    for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
+    for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS; i++) {
       assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
       List<Object[]> rows = getRows(receivingMailbox);
       assertEquals(rows.size(), 1);
       assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
     }
-    assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+    assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
     MseBlock block = readBlock(receivingMailbox);
     assertNotNull(block);
     assertTrue(block.isSuccess());
-    assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
-    assertNull(receivingMailbox.poll());
   }
 
   @Test
@@ -391,17 +393,16 @@ public class MailboxServiceTest {
     assertEquals(numCallbacks.get(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
 
     for (int i = 0; i < ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - 1; i++) {
-      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i);
+      assertEquals(receivingMailbox.getNumPendingBlocks(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS - i - 1);
       List<Object[]> rows = getRows(receivingMailbox);
       assertEquals(rows.size(), 1);
       assertEquals(rows.get(0), new Object[]{Integer.toString(i)});
     }
-    assertEquals(receivingMailbox.getNumPendingBlocks(), 1);
+    assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
     MseBlock block = readBlock(receivingMailbox);
     assertNotNull(block);
     assertTrue(block.isSuccess());
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
-    assertNull(receivingMailbox.poll());
   }
 
   @Test
@@ -536,7 +537,7 @@ public class MailboxServiceTest {
     sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{"0"}));
     receiveMailLatch.await();
     receivingMailbox.cancel();
-    assertEquals(numCallbacks.get(), 1);
+    assertEquals(numCallbacks.get(), 2);
 
     // Data blocks will be cleaned up
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
@@ -547,7 +548,7 @@ public class MailboxServiceTest {
     // Cancel is idempotent for both sending and receiving mailbox, so safe to 
call multiple times
     sendingMailbox.cancel(new Exception("TEST ERROR"));
     receivingMailbox.cancel();
-    assertEquals(numCallbacks.get(), 1);
+    assertEquals(numCallbacks.get(), 2);
     assertEquals(receivingMailbox.getNumPendingBlocks(), 0);
   }
 
@@ -619,7 +620,8 @@ public class MailboxServiceTest {
     }
 
     // Next send will be blocked on the receiver side and cause exception 
after timeout
-    sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
+    // We need to send a data block, given we don't block on EOS
+    sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{payload}));
     receiveMailLatch.await();
     assertEquals(numCallbacks.get(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
new file mode 100644
index 00000000000..b9c35169afd
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ReceivingMailboxTest {
+
+  private static final DataSchema DATA_SCHEMA =
+      new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+  private static final MseBlock.Data DATA_BLOCK = new 
RowHeapDataBlock(List.of(), DATA_SCHEMA, null);
+  private ReceivingMailbox.Reader _reader;
+
+  @BeforeMethod
+  public void setUp() {
+    _reader = Mockito.mock(ReceivingMailbox.Reader.class);
+  }
+
+  @Test
+  public void tooManyDataBlocksTheWriter()
+      throws InterruptedException, TimeoutException {
+    int size = 2;
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", size);
+
+    // Offer up to capacity
+    for (int i = 0; i < size; i++) {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+      assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer up to capacity");
+    }
+    // Offer one more should be rejected
+    try {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+      fail("Should have thrown timeout exception, but " + status + " was 
returned");
+    } catch (TimeoutException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void offerAfterEos() throws InterruptedException, TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer before EOS");
+
+    status = receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    // Data offer after EOS should be rejected
+    status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+
+    // Success offer after EOS should be rejected
+    status = receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+
+    // Error offer after EOS should be rejected
+    status = 
receivingMailbox.offer(ErrorMseBlock.fromError(QueryErrorCode.INTERNAL, 
"test"), List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+  }
+
+  @Test
+  public void shouldReadDataInOrder() throws InterruptedException, 
TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+
+    MseBlock[] offeredBlocks = new MseBlock[] {
+      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+    };
+    for (MseBlock block : offeredBlocks) {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(block, List.of(), 10);
+      assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer before EOS");
+    }
+
+    for (MseBlock offered : offeredBlocks) {
+      ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+      assertNotNull(read, "Should be able to read offered blocks");
+      assertEquals(read.getBlock(), offered, "Should read blocks in the order 
they were offered");
+    }
+
+    assertNull(receivingMailbox.poll(), "No more blocks to read, should return 
null");
+  }
+
+  @Test
+  public void lateEosRead() throws InterruptedException, TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+
+    MseBlock[] offeredBlocks = new MseBlock[] {
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+    };
+    for (MseBlock block : offeredBlocks) {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(block, List.of(), 10);
+      assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer before EOS");
+    }
+
+    for (MseBlock offered : offeredBlocks) {
+      ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+      assertNotNull(read, "Should be able to read offered blocks");
+      assertEquals(read.getBlock(), offered, "Should read blocks in the order 
they were offered");
+    }
+
+    // Offer EOS after all data blocks are read
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+    assertNotNull(read, "Should be able to read EOS");
+    assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS 
block");
+
+    // Offer after EOS should be rejected
+    status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+
+    // Poll again should return the EOS
+    ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+    assertNotNull(latePoll, "Should be able to read EOS");
+    assertEquals(latePoll.getBlock(), SuccessMseBlock.INSTANCE, "Should read 
EOS block");
+  }
+
+  @Test
+  public void bufferedDataIsKeptOnSuccess() throws InterruptedException, 
TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+
+    MseBlock[] offeredBlocks = new MseBlock[] {
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+    };
+    for (MseBlock block : offeredBlocks) {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(block, List.of(), 10);
+      assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer before EOS");
+    }
+    // Offer EOS
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    for (MseBlock offered : offeredBlocks) {
+      ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+      assertNotNull(read, "Should be able to read offered blocks");
+      assertEquals(read.getBlock(), offered, "Should read blocks in the order 
they were offered");
+    }
+    ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+    assertNotNull(read, "Should be able to read EOS");
+    assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS 
block");
+  }
+
+  @Test
+  public void bufferedDataIsDiscardedOnError() throws InterruptedException, 
TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+    ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new 
RuntimeException("Test error"));
+
+    MseBlock[] offeredBlocks = new MseBlock[] {
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+    };
+    for (MseBlock block : offeredBlocks) {
+      ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(block, List.of(), 10);
+      assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.SUCCESS, 
"Should be able to offer before EOS");
+    }
+    // Offer EOS
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(errorBlock, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+    assertNotNull(read, "Should be able to read EOS");
+    assertEquals(read.getBlock(), errorBlock, "Should read EOS block");
+  }
+
+  @Test
+  public void dataAfterSuccess() throws InterruptedException, TimeoutException 
{
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+
+    // Offer EOS
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(SuccessMseBlock.INSTANCE, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+    assertNotNull(read, "Should be able to read EOS");
+    assertEquals(read.getBlock(), SuccessMseBlock.INSTANCE, "Should read EOS 
block");
+
+    // Offer after EOS should be rejected
+    status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+
+    // Poll again should return the EOS
+    ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+    assertNotNull(latePoll, "Should be able to read EOS");
+    assertEquals(latePoll.getBlock(), SuccessMseBlock.INSTANCE, "Should read 
EOS block");
+  }
+
+  @Test
+  public void dataAfterError() throws InterruptedException, TimeoutException {
+    ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
+    receivingMailbox.registeredReader(_reader);
+
+    // Offer EOS
+    ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new 
RuntimeException("Test error"));
+    ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(errorBlock, List.of(), 10);
+    assertEquals(status, ReceivingMailbox.ReceivingMailboxStatus.LAST_BLOCK, 
"Should be able to offer EOS");
+
+    ReceivingMailbox.MseBlockWithStats read = receivingMailbox.poll();
+    assertNotNull(read, "Should be able to read EOS");
+    assertEquals(read.getBlock(), errorBlock, "Should read EOS block");
+
+    // Offer after EOS should be rejected
+    status = receivingMailbox.offer(DATA_BLOCK, List.of(), 10);
+    assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.ALREADY_TERMINATED,
+        "Should not be able to offer after EOS");
+
+    // Poll again should return the EOS
+    ReceivingMailbox.MseBlockWithStats latePoll = receivingMailbox.poll();
+    assertNotNull(latePoll, "Should be able to read EOS");
+    assertEquals(latePoll.getBlock(), errorBlock, "Should read EOS block");
+  }
+
+  @Test(timeOut = 10_000)
+  public void earlyTerminateUnblocksOffers()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    int maxPendingBlocks = 2;
+    ReceivingMailbox mailbox = new ReceivingMailbox("id", maxPendingBlocks);
+
+    ExecutorService offerEx = Executors.newCachedThreadPool();
+    try {
+      for (int i = 0; i < maxPendingBlocks; i++) {
+        CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> future = 
offer(DATA_BLOCK, mailbox, offerEx);
+        future.join();
+      }
+      CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> blocked = 
offer(DATA_BLOCK, mailbox, offerEx);
+      Thread.sleep(100); // a little wait to facilitate the offer to be blocked
+      mailbox.earlyTerminate();
+      ReceivingMailbox.ReceivingMailboxStatus status = blocked.get(10_000, 
TimeUnit.MILLISECONDS);
+      assertEquals(status, 
ReceivingMailbox.ReceivingMailboxStatus.WAITING_EOS);
+    } finally {
+      offerEx.shutdownNow();
+    }
+  }
+
+  @Test(timeOut = 10_000)
+  public void readingUnblocksWriters()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    int maxPendingBlocks = 2;
+    ReceivingMailbox mailbox = new ReceivingMailbox("id", maxPendingBlocks);
+    mailbox.registeredReader(_reader);
+
+    ExecutorService offerEx = Executors.newSingleThreadExecutor();
+    try {
+      for (int i = 0; i < maxPendingBlocks; i++) {
+        offer(DATA_BLOCK, mailbox, offerEx);
+      }
+      CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> blocked = 
offer(DATA_BLOCK, mailbox, offerEx);
+
+      int numRead = 0;
+      do {
+        ReceivingMailbox.MseBlockWithStats poll = mailbox.poll();
+        if (poll == null) {
+          // No more to read
+          Thread.sleep(10);
+        } else {
+          numRead++;
+          assertEquals(poll.getBlock(), DATA_BLOCK, "The read block should 
match the sent block");
+        }
+      } while (numRead < maxPendingBlocks + 1);
+      assertEquals(mailbox.getNumPendingBlocks(), 0, "All blocks should have 
been read");
+      assertTrue(blocked.isDone(), "The blocked offer should be unblocked by 
reading");
+      assertEquals(blocked.get(), 
ReceivingMailbox.ReceivingMailboxStatus.SUCCESS,
+          "The unblocked offer should succeed");
+    } finally {
+      offerEx.shutdownNow();
+    }
+  }
+
+  CompletableFuture<ReceivingMailbox.ReceivingMailboxStatus> offer(MseBlock 
block, ReceivingMailbox receivingMailbox,
+      ExecutorService executor) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        return receivingMailbox.offer(block, List.of(), 10_000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (TimeoutException e) {
+        throw new RuntimeException(e);
+      }
+    }, executor);
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 19b11aa9215..8861357f4c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.operator;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -104,26 +103,6 @@ public class MailboxSendOperatorTest {
     assertTrue(captor.getValue().isError(), "expected to send error block to 
exchange");
   }
 
-  @Test
-  public void shouldNotSendErrorBlockWhenTimedOut()
-      throws Exception {
-    // Given:
-    MseBlock.Data dataBlock = getDummyDataBlock();
-    when(_input.nextBlock()).thenReturn(dataBlock);
-    doThrow(new TimeoutException()).when(_exchange).send(any());
-
-    // When:
-    MseBlock block = getOperator().nextBlock();
-
-    // Then:
-    assertTrue(block.isError(), "expected error block to propagate");
-    ArgumentCaptor<MseBlock.Data> captor = 
ArgumentCaptor.forClass(MseBlock.Data.class);
-    verify(_exchange).send(captor.capture());
-    assertSame(captor.getValue(), dataBlock, "expected to send data block to 
exchange");
-
-    verify(_exchange, never()).send(any(), anyList());
-  }
-
   @Test
   public void shouldSendEosBlock()
       throws Exception {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 4ea75fcad74..93ffbb97c7a 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -20,9 +20,7 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.mailbox.SendingMailbox;
@@ -178,8 +176,7 @@ public class BlockExchangeTest {
     }
 
     @Override
-    protected void route(List<SendingMailbox> destinations, MseBlock.Data 
block)
-        throws IOException, TimeoutException {
+    protected void route(List<SendingMailbox> destinations, MseBlock.Data 
block) {
       for (SendingMailbox mailbox : destinations) {
         sendBlock(mailbox, block);
       }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index e1fc22f1bfa..6edba101848 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.assertj.core.api.Assertions;
 import org.testng.Assert;
 import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
@@ -267,11 +268,11 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
   // TODO: name the test using testCaseName for testng reports
   @Test(dataProvider = "testResourceQueryTestCaseProviderInputOnly")
-  public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored, 
String sql, String h2Sql, String expect,
-      boolean keepOutputRowOrder, boolean ignoreV2Optimizer)
+  public void testQueryTestCasesWithH2(String testCaseName, boolean isIgnored, 
String sql, String h2Sql,
+      @Nullable String expectErrorMsg, boolean keepOutputRowOrder, boolean 
ignoreV2Optimizer)
       throws Exception {
     // query pinot
-    runQuery(sql, expect, false).ifPresent(queryResult -> {
+    runQuery(sql, expectErrorMsg, false).ifPresent(queryResult -> {
       try {
         compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql), 
keepOutputRowOrder);
       } catch (Exception e) {
@@ -393,22 +394,34 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     });
   }
 
-  private Optional<QueryDispatcher.QueryResult> runQuery(String sql, final 
String except, boolean trace)
+  private Optional<QueryDispatcher.QueryResult> runQuery(String sql, @Nullable 
String expectedErrorMsg, boolean trace)
       throws Exception {
     try {
       // query pinot
       QueryDispatcher.QueryResult queryResult = queryRunner(sql, trace);
-      Assert.assertNull(except, "Expected error with message '" + except + "'. 
But instead rows were returned: "
-          + JsonUtils.objectToPrettyString(queryResult.getResultTable()));
+      if (expectedErrorMsg == null) {
+        Assert.assertTrue(queryResult.getProcessingException() == null,
+            "Unexpected exception: " + 
JsonUtils.objectToPrettyString(queryResult.getProcessingException()));
+      } else {
+        Assert.assertTrue(queryResult.getProcessingException() != null,
+            "Expected error with message '" + expectedErrorMsg + "'. But 
instead no error was thrown.");
+        Pattern pattern = Pattern.compile(expectedErrorMsg, Pattern.DOTALL);
+        
Assertions.assertThat(queryResult.getProcessingException().getMessage()).matches(pattern);
+        return Optional.empty();
+      }
+      Assert.assertNull(expectedErrorMsg, "Expected error with message '" + 
expectedErrorMsg
+          + "'. But instead rows were returned: " + 
JsonUtils.objectToPrettyString(queryResult.getResultTable()));
+      Assert.assertNotNull(queryResult.getResultTable(),
+          "Result table is null: " + 
JsonUtils.objectToPrettyString(queryResult));
       return Optional.of(queryResult);
     } catch (Exception e) {
-      if (except == null) {
+      if (expectedErrorMsg == null) {
         throw e;
       } else {
-        Pattern pattern = Pattern.compile(except, Pattern.DOTALL);
+        Pattern pattern = Pattern.compile(expectedErrorMsg, Pattern.DOTALL);
         Assert.assertTrue(pattern.matcher(e.getMessage()).matches(),
             String.format("Caught exception '%s', but it did not match the 
expected pattern '%s'.", e.getMessage(),
-                except));
+                expectedErrorMsg));
       }
     }
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
index 14181c31228..3d205129d09 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/ColocatedJoinQuickStart.java
@@ -92,6 +92,8 @@ public class ColocatedJoinQuickStart extends 
MultistageEngineQuickStart {
 
     
overrides.put(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
     
overrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+
+    
overrides.put(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
 1024);
     return overrides;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to