This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b289fc2e5 [multistage] Fix Leaks in Mailbox (#10322)
5b289fc2e5 is described below

commit 5b289fc2e5acb14388f84cb60b7c5d2d71f5e6cf
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Fri Mar 10 23:31:25 2023 +0530

    [multistage] Fix Leaks in Mailbox (#10322)
    
    - Mailbox ownership model:
     * 
----------------------------------------------------------------------------
     * (Operator Layer)
     *            MailboxSendOperator ---------> MailboxReceiveOperator
     *                   |                                  |
     *                   |                                  |
     * 
------------------|----------------------------------|----------------------
     *                   |                                  |
     * (MailboxService)  |                                  |  ( WAIT ON INIT )
     *                  \_/                                \_/
     *           SendingMailbox                      ReceivingMailbox
     * 
------------------|---------------------------------/^\---------------------
     * (Physical Layer)  |                                  |
     * (e.g. GRPC)       |                                  |  ( INITIALIZE )
     *                   |                                  |
     *                  \_/                                 |
     *           StreamObserver -------------------> StreamObserver
     * 
----------------------------------------------------------------------------
    
    - Work items done in this PR
    * clean up MailboxService API to sanitize cancel/release.
    * completely move SendMailbox management out of MailboxService
    * make sending mailbox lazy-open GRPC observers
    * add queue offer timeout to avoid infinite OOM blow up
    * handle close/cancel on leaf-stage operator + minor log change
    * enhanced in-mem transfer stream
---
 .../pinot/query/mailbox/GrpcMailboxService.java    | 112 +++++++----
 .../pinot/query/mailbox/GrpcReceivingMailbox.java  |  39 +++-
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  73 ++++---
 .../query/mailbox/InMemoryMailboxService.java      |  65 ++++--
 .../query/mailbox/InMemoryReceivingMailbox.java    |  34 ++--
 .../query/mailbox/InMemorySendingMailbox.java      |  49 +++--
 .../apache/pinot/query/mailbox/MailboxService.java |  44 +++-
 .../query/mailbox/MultiplexingMailboxService.java  |  15 +-
 .../pinot/query/mailbox/ReceivingMailbox.java      |  50 +++--
 .../apache/pinot/query/mailbox/SendingMailbox.java |  42 ++--
 .../mailbox/channel/InMemoryTransferStream.java    | 101 ++++++++++
 .../channel/MailboxContentStreamObserver.java      | 162 ++++++---------
 .../channel/MailboxStatusStreamObserver.java       |  12 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |  65 +++---
 .../runtime/executor/OpChainSchedulerService.java  |   9 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  16 ++
 .../runtime/operator/MailboxSendOperator.java      |  27 ++-
 .../runtime/operator/exchange/BlockExchange.java   |  31 ++-
 .../operator/exchange/BroadcastExchange.java       |   3 +-
 .../runtime/operator/exchange/HashExchange.java    |   3 +-
 .../runtime/operator/exchange/RandomExchange.java  |   3 +-
 .../operator/exchange/SingletonExchange.java       |   3 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   3 +-
 .../query/runtime/plan/PlanRequestContext.java     |   9 +-
 .../runtime/plan/ServerRequestPlanVisitor.java     |   4 +-
 .../plan/server/ServerPlanRequestContext.java      |   6 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      | 224 +++++++++++++++++++--
 .../query/mailbox/InMemoryMailboxServiceTest.java  | 121 ++++++++++-
 .../mailbox/MultiplexingMailboxServiceTest.java    |   8 +-
 .../executor/OpChainSchedulerServiceTest.java      |   4 +-
 .../runtime/operator/MailboxSendOperatorTest.java  |  33 ++-
 .../operator/exchange/BlockExchangeTest.java       |  12 +-
 .../operator/exchange/BroadcastExchangeTest.java   |   3 +-
 .../operator/exchange/HashExchangeTest.java        |   3 +-
 .../operator/exchange/RandomExchangeTest.java      |   3 +-
 .../operator/exchange/SingletonExchangeTest.java   |   3 +-
 36 files changed, 1020 insertions(+), 374 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 37fd81b3d2..744c681752 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -18,45 +18,61 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be 
cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to 
remote hosts. All indexed by the
- * mailboxID in the format of: 
<code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never 
registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first 
time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only 
tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on 
a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for 
example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to 
distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a 
time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel 
to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = 
Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> 
_receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying 
gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before 
the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
 TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() 
{
+            @Override
+            public void onRemoval(RemovalNotification<String, 
GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that 
we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in 
MailboxContentStreamObserver.
+                LOGGER.warn("Removing dangling GrpcReceivingMailbox: {}", 
notification.getKey());
+                notification.getValue().cancel();
+              }
+            }
+          })
+          .build();
   private final Consumer<MailboxIdentifier> _gotMailCallback;
 
   public GrpcMailboxService(String hostname, int mailboxPort, 
PinotConfiguration extraConfig,
@@ -88,29 +104,57 @@ public class GrpcMailboxService implements 
MailboxService<TransferableBlock> {
   }
 
   /**
-   * Register a mailbox, mailbox needs to be registered before use.
-   * @param mailboxId the id of the mailbox.
+   * {@inheritDoc}
    */
-  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId) {
-    ManagedChannel channel = getChannel(mailboxId.toString());
-    PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
-    CountDownLatch latch = new CountDownLatch(1);
-    StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
-        stub.open(new MailboxStatusStreamObserver(latch));
-    GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), 
mailboxContentStreamObserver, latch);
+  @Override
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId, long deadlineMs) {
+    MailboxStatusStreamObserver statusStreamObserver = new 
MailboxStatusStreamObserver();
+
+    GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), 
statusStreamObserver, (deadline) -> {
+      ManagedChannel channel = getChannel(mailboxId.toString());
+      PinotMailboxGrpc.PinotMailboxStub stub =
+          PinotMailboxGrpc.newStub(channel)
+              .withDeadlineAfter(Math.max(0L, deadline - 
System.currentTimeMillis()), TimeUnit.MILLISECONDS);
+      return stub.open(statusStreamObserver);
+    }, deadlineMs);
     return mailbox;
   }
 
   /**
-   * Register a mailbox, mailbox needs to be registered before use.
-   * @param mailboxId the id of the mailbox.
+   * {@inheritDoc} See {@link GrpcMailboxService} for details on the design.
    */
+  @Override
   public ReceivingMailbox<TransferableBlock> 
getReceivingMailbox(MailboxIdentifier mailboxId) {
-    return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
-        (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
+    try {
+      return _receivingMailboxCache.get(mailboxId.toString(),
+          () -> new GrpcReceivingMailbox(mailboxId.toString(), 
_gotMailCallback));
+    } catch (ExecutionException e) {
+      LOGGER.error(String.format("Error getting receiving mailbox: %s", 
mailboxId), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * If there's a cached receiving mailbox and it isn't closed (i.e. query 
didn't finish successfully), then this
+   * calls a cancel to ensure that the underlying gRPC stream is closed. After 
that the receiving mailbox is removed
+   * from the cache.
+   * <p>
+   *   Also refer to the definition in the interface:
+   * </p>
+   * <p>
+   *   {@inheritDoc}
+   * </p>
+   */
+  @Override
+  public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+    GrpcReceivingMailbox receivingMailbox = 
_receivingMailboxCache.getIfPresent(mailboxId.toString());
+    if (receivingMailbox != null && !receivingMailbox.isClosed()) {
+      receivingMailbox.cancel();
+    }
+    _receivingMailboxCache.invalidate(mailboxId.toString());
   }
 
-  public ManagedChannel getChannel(String mailboxId) {
+  private ManagedChannel getChannel(String mailboxId) {
     return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index 464c092e87..f6dde0c823 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -18,34 +18,41 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link ReceivingMailbox}.
+ * GRPC implementation of the {@link ReceivingMailbox}. This mailbox doesn't 
hold any resources upon creation.
+ * Instead an explicit {@link #init} call is made when the sender sends the 
first data-block which attaches
+ * references to the {@link StreamObserver} to this mailbox.
  */
 public class GrpcReceivingMailbox implements 
ReceivingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcReceivingMailbox.class);
   private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
   private final String _mailboxId;
-  private Consumer<MailboxIdentifier> _gotMailCallback;
+  private final Consumer<MailboxIdentifier> _gotMailCallback;
   private final CountDownLatch _initializationLatch;
-  private final AtomicInteger _totalMsgReceived = new AtomicInteger(0);
 
   private MailboxContentStreamObserver _contentStreamObserver;
+  private StreamObserver<Mailbox.MailboxStatus> _statusStreamObserver;
 
   public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> 
gotMailCallback) {
     _mailboxId = mailboxId;
@@ -53,9 +60,11 @@ public class GrpcReceivingMailbox implements 
ReceivingMailbox<TransferableBlock>
     _initializationLatch = new CountDownLatch(1);
   }
 
-  public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver 
streamObserver) {
+  public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver 
streamObserver,
+      StreamObserver<Mailbox.MailboxStatus> statusStreamObserver) {
     if (_initializationLatch.getCount() > 0) {
       _contentStreamObserver = streamObserver;
+      _statusStreamObserver = statusStreamObserver;
       _initializationLatch.countDown();
     }
     return _gotMailCallback;
@@ -70,29 +79,37 @@ public class GrpcReceivingMailbox implements 
ReceivingMailbox<TransferableBlock>
    *  2. If the received block from the sender is a data-block with 0 rows.
    * </p>
    */
+  @Nullable
   @Override
-  public TransferableBlock receive()
-      throws Exception {
+  public TransferableBlock receive() throws Exception {
     if (!waitForInitialize()) {
       return null;
     }
     MailboxContent mailboxContent = _contentStreamObserver.poll();
-    _totalMsgReceived.incrementAndGet();
     return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
   }
 
   @Override
   public boolean isInitialized() {
-    return _initializationLatch.getCount() <= 0;
+    return _initializationLatch.getCount() == 0;
   }
 
   @Override
   public boolean isClosed() {
-    return isInitialized() && _contentStreamObserver.isCompleted();
+    return isInitialized() && _contentStreamObserver.hasConsumedAllData();
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (isInitialized()) {
+      try {
+        _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+      } catch (Exception e) {
+        // TODO: This can happen if the call is already closed. Consider 
removing this log altogether or find a way
+        //  to check if the stream is already closed.
+        LOGGER.info("Tried to cancel receiving mailbox", e);
+      }
+    }
   }
 
   private boolean waitForInitialize()
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index d2f4de89c2..3fa6d6c229 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -18,79 +18,94 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is 
created on the first call to {@link #send}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
-  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private final CountDownLatch _finishLatch;
-  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private final Function<Long, StreamObserver<MailboxContent>> 
_mailboxContentStreamObserverSupplier;
+  private final MailboxStatusStreamObserver _statusObserver;
+  private final long _deadlineMs;
 
-  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> 
mailboxContentStreamObserver,
-      CountDownLatch latch) {
+  public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver 
statusObserver,
+      Function<Long, StreamObserver<MailboxContent>> 
contentStreamObserverSupplier, long deadlineMs) {
     _mailboxId = mailboxId;
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-    _finishLatch = latch;
-    _initialized.set(false);
+    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+    _statusObserver = statusObserver;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws UnsupportedOperationException {
+      throws Exception {
     if (!_initialized.get()) {
-      // initialization is special
       open();
     }
+    Preconditions.checkState(!_statusObserver.isFinished(),
+        "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
-    _totalMsgSent.incrementAndGet();
   }
 
   @Override
-  public void complete() {
+  public void complete()
+      throws Exception {
     _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
-  public void open() {
-    // TODO: Get rid of init call.
-    // send a begin-of-stream message.
-    
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, 
"true").build());
-    _initialized.set(true);
+  public boolean isInitialized() {
+    return _initialized.get();
   }
 
   @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public void cancel(Throwable t) {
+    if (_initialized.get() && !_statusObserver.isFinished()) {
+      LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
+      try {
+        _mailboxContentStreamObserver.onError(Status.fromThrowable(
+            new RuntimeException("Cancelled by the 
sender")).asRuntimeException());
+      } catch (Exception e) {
+        // TODO: We don't necessarily need to log this since this is 
relatively quite likely to happen. Logging this
+        //  anyways as info for now so we can see how frequently this happens.
+        LOGGER.info("Unexpected error issuing onError to 
MailboxContentStreamObserver: {}", e.getMessage());
+      }
+    }
   }
 
   @Override
-  public void waitForFinish(long timeout, TimeUnit unit)
-      throws InterruptedException {
-    _finishLatch.await(timeout, unit);
+  public String getMailboxId() {
+    return _mailboxId;
   }
 
-  @Override
-  public void cancel(Throwable t) {
+  private void open() {
+    _mailboxContentStreamObserver = 
_mailboxContentStreamObserverSupplier.apply(_deadlineMs);
+    _initialized.set(true);
+    // send a begin-of-stream message.
+    
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, 
"true").build());
   }
 
   private MailboxContent toMailboxContent(DataBlock dataBlock) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
index b5bcb9057b..c15546b25a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
@@ -19,21 +19,41 @@
 package org.apache.pinot.query.mailbox;
 
 import com.google.common.base.Preconditions;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class InMemoryMailboxService implements 
MailboxService<TransferableBlock> {
-  // channel manager
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryMailboxService.class);
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = 
Duration.ofMinutes(5);
   private final String _hostname;
   private final int _mailboxPort;
   private final Consumer<MailboxIdentifier> _receivedMailContentCallback;
 
-  private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox 
= new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new 
ConcurrentHashMap<>();
+  private final Cache<String, InMemoryReceivingMailbox> _receivingMailboxCache 
=
+      
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
 TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, 
InMemoryReceivingMailbox>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, 
InMemoryReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                LOGGER.info("Evicting dangling InMemoryReceivingMailbox: {}", 
notification.getKey());
+                // TODO: This should be tied to the query deadline. Unlike 
GrpcMailboxService, the change here is
+                //  simpler.
+                notification.getValue().cancel();
+              }
+            }
+          })
+          .build();
 
   public InMemoryMailboxService(String hostname, int mailboxPort,
       Consumer<MailboxIdentifier> receivedMailContentCallback) {
@@ -64,25 +84,32 @@ public class InMemoryMailboxService implements 
MailboxService<TransferableBlock>
     return _mailboxPort;
   }
 
-  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId) {
+  @Override
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId, long deadlineMs) {
     Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory 
mailbox service for non-local transport");
-    String mId = mailboxId.toString();
-    // for now, we use an unbounded blocking queue as the means of 
communication between
-    // in memory mailboxes - the reason for this is that unless we implement 
flow control,
-    // blocks will sit in memory either way (blocking the sender from sending 
doesn't prevent
-    // more blocks from being generated from upstream). on the other hand, 
having a capacity
-    // for the queue causes the sending thread to occupy a task pool thread 
and prevents other
-    // threads (most importantly, the receiving thread) from running - which 
can cause unnecessary
-    // failure situations
-    // TODO: when we implement flow control, we should swap this out with a 
bounded abstraction
     return new InMemorySendingMailbox(mailboxId.toString(),
-        _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()), 
getReceivedMailContentCallback());
+        () -> new InMemoryTransferStream(mailboxId, this, deadlineMs),
+        getReceivedMailContentCallback());
   }
 
+  @Override
   public ReceivingMailbox<TransferableBlock> 
getReceivingMailbox(MailboxIdentifier mailboxId) {
     Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory 
mailbox service for non-local transport");
     String mId = mailboxId.toString();
-    BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new 
LinkedBlockingQueue<>());
-    return _receivingMailbox.computeIfAbsent(mId, id -> new 
InMemoryReceivingMailbox(mId, mailboxQueue));
+    try {
+      return _receivingMailboxCache.get(mId, () -> new 
InMemoryReceivingMailbox(mId));
+    } catch (ExecutionException e) {
+      LOGGER.error(String.format("Error getting in-memory receiving 
mailbox=%s", mailboxId), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+    InMemoryReceivingMailbox receivingMailbox = 
_receivingMailboxCache.getIfPresent(mailboxId.toString());
+    if (receivingMailbox != null) {
+      receivingMailbox.cancel();
+      _receivingMailboxCache.invalidate(mailboxId.toString());
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
index 43f32c61c5..57e6cce087 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
@@ -18,30 +18,30 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.concurrent.BlockingQueue;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
 public class InMemoryReceivingMailbox implements 
ReceivingMailbox<TransferableBlock> {
   private final String _mailboxId;
-  private final BlockingQueue<TransferableBlock> _queue;
-  private volatile boolean _closed;
+  private InMemoryTransferStream _transferStream;
+  private volatile boolean _closed = false;
 
-  public InMemoryReceivingMailbox(String mailboxId, 
BlockingQueue<TransferableBlock> queue) {
+  public InMemoryReceivingMailbox(String mailboxId) {
     _mailboxId = mailboxId;
-    _queue = queue;
-    _closed = false;
   }
 
-  @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public void init(InMemoryTransferStream transferStream) {
+    _transferStream = transferStream;
   }
 
   @Override
   public TransferableBlock receive()
       throws Exception {
-    TransferableBlock block = _queue.poll();
+    if (_transferStream == null) {
+      return null;
+    }
+    TransferableBlock block = _transferStream.poll();
 
     if (block == null) {
       return null;
@@ -56,15 +56,23 @@ public class InMemoryReceivingMailbox implements 
ReceivingMailbox<TransferableBl
 
   @Override
   public boolean isInitialized() {
-    return true;
+    return _transferStream != null;
   }
 
   @Override
   public boolean isClosed() {
-    return _closed && _queue.size() == 0;
+    return _closed;
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (_transferStream != null) {
+      _transferStream.cancel();
+    }
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 18dcd8ffd3..035ee16f2b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -18,56 +18,61 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
 public class InMemorySendingMailbox implements 
SendingMailbox<TransferableBlock> {
   private final Consumer<MailboxIdentifier> _gotMailCallback;
-  private final String _mailboxId;
+  private final JsonMailboxIdentifier _mailboxId;
 
-  // TODO: changed to 2-way communication channel.
-  private BlockingQueue<TransferableBlock> _queue;
+  private Supplier<InMemoryTransferStream> _transferStreamProvider;
+  private InMemoryTransferStream _transferStream;
 
-  public InMemorySendingMailbox(String mailboxId, 
BlockingQueue<TransferableBlock> queue,
+  public InMemorySendingMailbox(String mailboxId, 
Supplier<InMemoryTransferStream> transferStreamProvider,
       Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _queue = queue;
+    _mailboxId = JsonMailboxIdentifier.parse(mailboxId);
+    _transferStreamProvider = transferStreamProvider;
     _gotMailCallback = gotMailCallback;
   }
 
-  @Override
-  public void open() {
-  }
-
   @Override
   public String getMailboxId() {
-    return _mailboxId;
+    return _mailboxId.toString();
   }
 
   @Override
   public void send(TransferableBlock data)
-      throws UnsupportedOperationException {
-    if (!_queue.offer(data)) {
-      // this should never happen, since we use a LinkedBlockingQueue
-      // which does not have capacity bounds
-      throw new IllegalStateException("Failed to insert into in-memory mailbox 
" + _mailboxId);
+      throws Exception {
+    if (!isInitialized()) {
+      initialize();
     }
-    _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
+    _transferStream.send(data);
+    _gotMailCallback.accept(_mailboxId);
   }
 
   @Override
-  public void complete() {
+  public void complete() throws Exception {
+    _transferStream.complete();
   }
 
   @Override
-  public void waitForFinish(long timeout, TimeUnit unit)
-      throws InterruptedException {
+  public boolean isInitialized() {
+    return _transferStream != null;
   }
 
   @Override
   public void cancel(Throwable t) {
+    if (isInitialized() && !_transferStream.isCancelled()) {
+      _transferStream.cancel();
+    }
+  }
+
+  private void initialize() {
+    if (_transferStream == null) {
+      _transferStream = _transferStreamProvider.get();
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 234dd78e98..b4d16b0a85 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
 /**
  * Mailbox service that handles transfer for mailbox contents.
  *
@@ -31,12 +35,12 @@ public interface MailboxService<T> {
   void start();
 
   /**
-   * Shutting down the mailbox service.s
+   * Shutting down the mailbox service.
    */
   void shutdown();
 
   /**
-   * Get the host name on which this mailbox service is runnning on.
+   * Get the host name on which this mailbox service is running on.
    *
    * @return the host.
    */
@@ -50,10 +54,7 @@ public interface MailboxService<T> {
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist 
already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
@@ -61,10 +62,37 @@ public interface MailboxService<T> {
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The 
returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire 
any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for 
the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as 
the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long 
deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created 
before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts 
receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox 
hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its 
references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or 
cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its 
close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never 
registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks 
of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that 
for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any 
internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
index e80a65ce71..c4f9c6fdec 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
@@ -72,11 +72,20 @@ public class MultiplexingMailboxService implements 
MailboxService<TransferableBl
   }
 
   @Override
-  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId) {
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier 
mailboxId, long deadlineMs) {
     if (mailboxId.isLocal()) {
-      return _inMemoryMailboxService.getSendingMailbox(mailboxId);
+      return _inMemoryMailboxService.getSendingMailbox(mailboxId, deadlineMs);
     }
-    return _grpcMailboxService.getSendingMailbox(mailboxId);
+    return _grpcMailboxService.getSendingMailbox(mailboxId, deadlineMs);
+  }
+
+  @Override
+  public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+    if (mailboxId.isLocal()) {
+      _inMemoryMailboxService.releaseReceivingMailbox(mailboxId);
+      return;
+    }
+    _grpcMailboxService.releaseReceivingMailbox(mailboxId);
   }
 
   public static MultiplexingMailboxService newInstance(String hostname, int 
port,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 377430883c..98893aa5e8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -18,42 +18,58 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import javax.annotation.Nullable;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+
+
 /**
- * Mailbox is used to send and receive data.
+ * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is 
with the MailboxService, which is unlike
+ * the {@link SendingMailbox} whose ownership lies with the {@link 
MailboxSendOperator}. This is because the
+ * ReceivingMailbox can be initialized even before the corresponding OpChain 
is registered on the receiver, whereas
+ * the SendingMailbox is initialized when the MailboxSendOperator is running. 
Also see {@link #isInitialized()}.
  *
- * Mailbox should be instantiated on both side of MailboxServer.
- *
- * @param <T> type of data carried over the mailbox.
+ * @param <T> the unit of data that each {@link #receive()} call returns.
  */
 public interface ReceivingMailbox<T> {
 
   /**
    * Get the unique identifier for the mailbox.
-   *
-   * @return Mailbox ID.
    */
   String getMailboxId();
 
   /**
-   * Receive a data packet from the mailbox. Depending on the implementation, 
this may return null. The caller should
-   * use {@link ReceivingMailbox#isClosed()} to determine if the sender is 
done sending and the channel is closed.
-   * @return data packet.
-   * @throws Exception
+   * Returns a unit of data. Implementations are allowed to return null, in 
which case {@link MailboxReceiveOperator}
+   * will assume that this mailbox doesn't have any data to return and it will 
instead poll the other mailbox (if any).
    */
-  T receive()
-      throws Exception;
+  @Nullable
+  T receive() throws Exception;
 
   /**
-   * Check if receiving mailbox is initialized.
-   * @return
+   * A ReceivingMailbox is considered initialized when it has a reference to 
the underlying channel used for receiving
+   * the data. The underlying channel may be a gRPC stream, in-memory queue, 
etc. Once a receiving mailbox is
+   * initialized, it has the ability to close the underlying channel via the 
{@link #cancel()} method.
    */
   boolean isInitialized();
 
   /**
-   * Check if mailbox is closed.
-   * @return
+   * A ReceivingMailbox is considered closed if it has sent all the data to 
the receiver and doesn't have any more data
+   * to send.
    */
   boolean isClosed();
 
-  void cancel(Throwable e);
+  /**
+   * A ReceivingMailbox may hold a reference to the underlying channel. 
Usually the channel would be automatically
+   * closed once all the data has been received by the receiver, and in such 
cases {@link #isClosed()} returns true.
+   * However in failure scenarios the underlying channel may not be released, 
and the receiver can use this method to
+   * ensure the same.
+   *
+   * This API should ensure that the underlying channel is "released" if it 
hasn't been already. If the channel has
+   * already been released, the API shouldn't throw and instead return 
gracefully.
+   *
+   * <p>
+   *   This method may be called multiple times, so implementations should 
ensure this is idempotent.
+   * </p>
+   */
+  void cancel();
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 6cc162b0ec..24651348ac 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,20 +18,15 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 
 
 /**
- * Mailbox is used to send and receive data.
+ * Mailbox that's used to send data.
  *
- * Mailbox should be instantiated on both side of MailboxServer.
- *
- * @param <T> type of data carried over the mailbox.
+ * @param <T> unit of data sent in one {@link #send} call.
  */
 public interface SendingMailbox<T> {
-
-  void open();
-
   /**
    * get the unique identifier for the mailbox.
    *
@@ -40,19 +35,36 @@ public interface SendingMailbox<T> {
   String getMailboxId();
 
   /**
-   * send a data packet through the mailbox.
-   * @param data
-   * @throws UnsupportedOperationException
+   * Send a single unit of data to a receiver. Note that SendingMailbox are 
required to acquire resources lazily in
+   * this call and they should <b>not</b> acquire any resources when they are 
created. This method should throw if there
+   * was an error sending the data, since that would allow {@link 
BlockExchange} to exit early.
    */
   void send(T data)
-      throws UnsupportedOperationException;
+      throws Exception;
 
   /**
-   * Complete delivery of the current mailbox.
+   * Called when there is no more data to be sent by the {@link 
BlockExchange}. This is also a signal for the
+   * SendingMailbox that the sender is done sending data from its end. Note 
that this doesn't mean that the receiver
+   * has received all the data.
+   *
+   * <p>
+   * <b>Note:</b> While this is similar to a close() method that's usually 
provided with objects that hold releasable
+   * resources, the key difference is that a SendingMailbox cannot completely 
release the resources on its end
+   * gracefully, since it would be waiting for the receiver to ack that it has 
received all the data. See
+   * {@link #cancel} which can allow callers to force release the underlying 
resources.
+   * </p>
    */
-  void complete();
+  void complete()
+      throws Exception;
 
-  void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException;
+  /**
+   * A SendingMailbox is considered initialized after it has acquired a 
reference to the underlying channel that will
+   * be used to send data to the receiver.
+   */
+  boolean isInitialized();
 
+  /**
+   * Allows terminating the underlying channel.
+   */
   void cancel(Throwable t);
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
new file mode 100644
index 0000000000..8bf65110be
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.pinot.query.mailbox.InMemoryMailboxService;
+import org.apache.pinot.query.mailbox.InMemoryReceivingMailbox;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class InMemoryTransferStream {
+
+  private MailboxIdentifier _mailboxId;
+  private BlockingQueue<TransferableBlock> _queue;
+  private InMemoryMailboxService _mailboxService;
+  private final long _deadlineMs;
+  private boolean _receivingMailboxInitialized = false;
+  private boolean _isCancelled = false;
+  private boolean _isCompleted = false;
+
+  public InMemoryTransferStream(MailboxIdentifier mailboxId, 
InMemoryMailboxService mailboxService, long deadlineMs) {
+    _mailboxId = mailboxId;
+    _queue = new LinkedBlockingQueue<>();
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
+  }
+
+  public void send(TransferableBlock block) {
+    Preconditions.checkState(!isCancelled(), "Tried to send on a cancelled 
InMemory stream");
+    // TODO: Deadline check can be more efficient.
+    // While, in most cases the receiver would have anyways called cancel, for 
expensive queries it is possible that
+    // the receiver may have hung-up before it could get a reference to the 
stream. This can happen if the sending
+    // OpChain was running an expensive operation (like a large hash-join).
+    long currentTime = System.currentTimeMillis();
+    Preconditions.checkState(currentTime < _deadlineMs,
+        String.format("Deadline exceeded by %s ms", currentTime - 
_deadlineMs));
+    if (!_receivingMailboxInitialized) {
+      InMemoryReceivingMailbox receivingMailbox =
+          (InMemoryReceivingMailbox) 
_mailboxService.getReceivingMailbox(_mailboxId);
+      receivingMailbox.init(this);
+      _receivingMailboxInitialized = true;
+    }
+    _queue.offer(block);
+  }
+
+  @Nullable
+  public TransferableBlock poll()
+      throws InterruptedException {
+    if (_isCancelled) {
+      return TransferableBlockUtils.getErrorTransferableBlock(
+          new RuntimeException("InMemoryTransferStream is cancelled"));
+    } else if (System.currentTimeMillis() > _deadlineMs) {
+      return TransferableBlockUtils.getErrorTransferableBlock(
+          new RuntimeException("Deadline reached for in-memory transfer 
stream"));
+    }
+    return _queue.poll();
+  }
+
+  public void complete() {
+    _isCompleted = true;
+  }
+
+  public int size() {
+    return _queue.size();
+  }
+
+  public void cancel() {
+    _isCancelled = true;
+    // Eagerly lose references to the underlying data.
+    _queue.clear();
+  }
+
+  public boolean isCompleted() {
+    return _isCompleted;
+  }
+
+  public boolean isCancelled() {
+    return _isCancelled;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 220e429caa..c8375d4eed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -19,15 +19,15 @@
 package org.apache.pinot.query.mailbox.channel;
 
 import com.google.protobuf.ByteString;
+import io.grpc.Context;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
@@ -37,8 +37,6 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.lang.Math.max;
-
 
 /**
  * {@code MailboxContentStreamObserver} is the content streaming observer used 
to receive mailbox content.
@@ -48,59 +46,41 @@ import static java.lang.Math.max;
  * to the sender side.
  */
 public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.MailboxContent> {
+  public static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+  private static final Mailbox.MailboxContent DEFAULT_ERROR_MAILBOX_CONTENT;
+  // This delta is added to the buffer offer on top of the query timeout to 
avoid a race between client cancellation
+  // due to deadline and server-side cancellation due to the receiving buffer 
being full.
+  private static final long BUFFER_OFFER_TIMEOUT_DELTA_MS = 1_000;
 
-  private static Mailbox.MailboxContent createErrorContent(Throwable e)
-      throws IOException {
-    return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
-            TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException(e)).getDataBlock().toBytes()))
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true").build();
+  static {
+    try {
+      RuntimeException exception = new RuntimeException(
+          "Error creating error-content.. please file a bug in the 
apache/pinot repo");
+      DEFAULT_ERROR_MAILBOX_CONTENT = 
Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+          
TransferableBlockUtils.getErrorTransferableBlock(exception).getDataBlock().toBytes()))
+          .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true").build();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private final GrpcMailboxService _mailboxService;
   private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
-  private final boolean _isEnabledFeedback;
 
   private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
   private final BlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
 
-  private ReadWriteLock _bufferSizeLock = new ReentrantReadWriteLock();
-  @GuardedBy("bufferSizeLock")
-  private int _maxBufferSize = 0;
-  private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
-  @GuardedBy("_errorLock")
   private Mailbox.MailboxContent _errorContent = null;
   private JsonMailboxIdentifier _mailboxId;
   private Consumer<MailboxIdentifier> _gotMailCallback;
 
-  private void updateMaxBufferSize() {
-    _bufferSizeLock.writeLock().lock();
-    _maxBufferSize = max(_maxBufferSize, _receivingBuffer.size());
-    _bufferSizeLock.writeLock().unlock();
-  }
-
-  private int getMaxBufferSize() {
-    try {
-      _bufferSizeLock.readLock().lock();
-      return _maxBufferSize;
-    } finally {
-      _bufferSizeLock.readLock().unlock();
-    }
-  }
-
   public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
       StreamObserver<Mailbox.MailboxStatus> responseObserver) {
-    this(mailboxService, responseObserver, false);
-  }
-
-  public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
-      StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean 
isEnabledFeedback) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
-    // TODO: Replace unbounded queue with bounded queue when we have 
backpressure in place.
-    // It is possible this will create high memory pressure since we have 
memory leak issues.
-    _receivingBuffer = new LinkedBlockingQueue();
-    _isEnabledFeedback = isEnabledFeedback;
+    _receivingBuffer = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_MAILBOX_CONTENT);
   }
 
   /**
@@ -110,92 +90,82 @@ public class MailboxContentStreamObserver implements 
StreamObserver<Mailbox.Mail
    * to indicate when to call this method.
    */
   public Mailbox.MailboxContent poll() {
-    try {
-      _errorLock.readLock().lock();
-      if (_errorContent != null) {
-        return _errorContent;
-      }
-    } finally {
-      _errorLock.readLock().unlock();
+    if (_errorContent != null) {
+      return _errorContent;
     }
-    if (isCompleted()) {
+    if (hasConsumedAllData()) {
       return null;
     }
 
     return _receivingBuffer.poll();
   }
 
-  public boolean isCompleted() {
-    return _isCompleted.get() && _receivingBuffer.isEmpty();
-  }
-
   @Override
   public void onNext(Mailbox.MailboxContent mailboxContent) {
     _mailboxId = JsonMailboxIdentifier.parse(mailboxContent.getMailboxId());
 
+    long remainingTimeMs = 
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
     GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) 
_mailboxService.getReceivingMailbox(_mailboxId);
-    _gotMailCallback = receivingMailbox.init(this);
+    _gotMailCallback = receivingMailbox.init(this, _responseObserver);
+    if (_errorContent != null) {
+      // This should never happen because gRPC calls StreamObserver in a 
single-threaded fashion, and if error-content
+      // is not null then we would have already issued a onError which means 
gRPC will not call onNext again.
+      LOGGER.warn("onNext called even though already errored out");
+      return;
+    }
 
     if 
(!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY))
 {
       // when the receiving end receives a message put it in the mailbox queue.
-      // TODO: pass a timeout to _receivingBuffer.
-      if (!_receivingBuffer.offer(mailboxContent)) {
-        // TODO: close the stream.
-        RuntimeException e = new RuntimeException("Mailbox receivingBuffer is 
full:" + _mailboxId);
-        LOGGER.error(e.getMessage());
-        try {
-          _errorLock.writeLock().lock();
-          _errorContent = createErrorContent(e);
-        } catch (IOException ioe) {
-          e = new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
-          LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
+      try {
+        final long offerTimeoutMs = remainingTimeMs + 
BUFFER_OFFER_TIMEOUT_DELTA_MS;
+        if (!_receivingBuffer.offer(mailboxContent, offerTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+          RuntimeException e = new RuntimeException("Timed out offering to the 
receivingBuffer: " + _mailboxId);
           LOGGER.error(e.getMessage());
-          throw e;
-        } finally {
-          _errorLock.writeLock().unlock();
+          _errorContent = createErrorContent(e);
+          try {
+            _responseObserver.onError(Status.CANCELLED.asRuntimeException());
+          } catch (Exception ignored) {
+            // Exception can be thrown if the stream deadline has already been 
reached, so we simply ignore it.
+          }
         }
+      } catch (InterruptedException e) {
+        _errorContent = createErrorContent(e);
+        LOGGER.error("Interrupted while polling receivingBuffer", e);
+        _responseObserver.onError(Status.CANCELLED.asRuntimeException());
       }
       _gotMailCallback.accept(_mailboxId);
-
-      updateMaxBufferSize();
-
-      if (_isEnabledFeedback) {
-        // TODO: this has race conditions with onCompleted() because sender 
blindly closes connection channels once
-        // it has finished sending all the data packets.
-        int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
-        Mailbox.MailboxStatus.Builder builder =
-            
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
-                .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, 
String.valueOf(remainingCapacity));
-        if 
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
 != null) {
-          builder.putAllMetadata(mailboxContent.getMetadataMap());
-          builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true");
-        }
-        Mailbox.MailboxStatus status = builder.build();
-        // returns the buffer available size to sender for rate controller / 
throttling.
-        _responseObserver.onNext(status);
-      }
     }
   }
 
   @Override
   public void onError(Throwable e) {
-    try {
-      _errorLock.writeLock().lock();
+    if (_errorContent == null) {
       _errorContent = createErrorContent(e);
-      _gotMailCallback.accept(_mailboxId);
-      throw new RuntimeException(e);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Unable to encode exception for cascade 
reporting: " + e, ioe);
-    } finally {
-      _errorLock.writeLock().unlock();
-      LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
     }
+    _gotMailCallback.accept(_mailboxId);
   }
 
   @Override
   public void onCompleted() {
     _isCompleted.set(true);
     _responseObserver.onCompleted();
-    LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", 
_mailboxId);
+  }
+
+  /**
+   * @return true if all data has been received via {@link #poll()}.
+   */
+  public boolean hasConsumedAllData() {
+    return _isCompleted.get() && _receivingBuffer.isEmpty();
+  }
+
+  private static Mailbox.MailboxContent createErrorContent(Throwable e) {
+    try {
+      return 
Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+          TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException(e)).getDataBlock().toBytes()))
+          .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, 
"true").build();
+    } catch (IOException ioException) {
+      LOGGER.error("Error creating error MailboxContent", ioException);
+      return DEFAULT_ERROR_MAILBOX_CONTENT;
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index fd7443db12..291a38325b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -38,10 +38,9 @@ public class MailboxStatusStreamObserver implements 
StreamObserver<Mailbox.Mailb
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
   private final AtomicInteger _bufferSize = new AtomicInteger(5);
 
-  private CountDownLatch _finishLatch;
+  private final CountDownLatch _finishLatch = new CountDownLatch(1);
 
-  public MailboxStatusStreamObserver(CountDownLatch finishLatch) {
-    _finishLatch = finishLatch;
+  public MailboxStatusStreamObserver() {
   }
 
   @Override
@@ -60,12 +59,15 @@ public class MailboxStatusStreamObserver implements 
StreamObserver<Mailbox.Mailb
   @Override
   public void onError(Throwable e) {
     _finishLatch.countDown();
-    LOGGER.error("Receiving error msg from grpc mailbox status stream:", e);
-    throw new RuntimeException(e);
+    LOGGER.error("[mailbox] Server returned onError", e);
   }
 
   @Override
   public void onCompleted() {
     _finishLatch.countDown();
   }
+
+  public boolean isFinished() {
+    return _finishLatch.getCount() == 0;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a1d22c678f..f7967322a0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -161,13 +161,38 @@ public class QueryRunner {
 
   public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataMap) {
     long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+    long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
     if (isLeafStage(distributedStagePlan)) {
-      // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
-      // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
-      // server executor.
+      runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs, 
requestId);
+    } else {
+      StageNode stageRoot = distributedStagePlan.getStageRoot();
+      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+          new PlanRequestContext(_mailboxService, requestId, 
stageRoot.getStageId(), timeoutMs, deadlineMs,
+              new VirtualServerAddress(distributedStagePlan.getServer()), 
distributedStagePlan.getMetadataMap()));
+      _scheduler.register(rootOperator);
+    }
+  }
+
+  public ExecutorService getQueryWorkerExecutorService() {
+    return _queryWorkerExecutorService;
+  }
+
+  public ExecutorService getQueryRunnerExecutorService() {
+    return _queryRunnerExecutorService;
+  }
+
+  private void runLeafStage(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataMap,
+      long deadlineMs, long requestId) {
+    // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
+    // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
+    // server executor.
+    MailboxSendOperator mailboxSendOperator = null;
+    try {
       long leafStageStartMillis = System.currentTimeMillis();
       List<ServerPlanRequestContext> serverQueryRequests =
-          constructServerQueryRequests(distributedStagePlan, 
requestMetadataMap, _helixPropertyStore, _mailboxService);
+          constructServerQueryRequests(distributedStagePlan, 
requestMetadataMap, _helixPropertyStore, _mailboxService,
+              deadlineMs);
 
       // send the data table via mailbox in one-off fashion (e.g. no 
block-level split, one data table/partition key)
       List<InstanceResponseBlock> serverQueryResults = new 
ArrayList<>(serverQueryRequests.size());
@@ -181,37 +206,27 @@ public class QueryRunner {
               + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = 
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
-      MailboxSendOperator mailboxSendOperator = new 
MailboxSendOperator(_mailboxService,
+      mailboxSendOperator = new MailboxSendOperator(_mailboxService,
           new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema(), requestId,
               sendNode.getStageId(), _rootServer), 
receivingStageMetadata.getServerInstances(),
           sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), 
_rootServer, requestId,
-          sendNode.getStageId(), sendNode.getReceiverStageId());
+          sendNode.getStageId(), sendNode.getReceiverStageId(), deadlineMs);
       int blockCounter = 0;
       while 
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
         LOGGER.debug("Acquired transferable block: {}", blockCounter++);
       }
-      mailboxSendOperator.toExplainString();
-    } else {
-      long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
-      StageNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
-          new PlanRequestContext(_mailboxService, requestId, 
stageRoot.getStageId(), timeoutMs,
-              new VirtualServerAddress(distributedStagePlan.getServer()), 
distributedStagePlan.getMetadataMap()));
-      _scheduler.register(rootOperator);
+      mailboxSendOperator.close();
+    } catch (Exception e) {
+      LOGGER.error(String.format("Error running leafStage for requestId=%s", 
requestId), e);
+      if (mailboxSendOperator != null) {
+        mailboxSendOperator.cancel(e);
+      }
     }
   }
 
-  public ExecutorService getQueryWorkerExecutorService() {
-    return _queryWorkerExecutorService;
-  }
-
-  public ExecutorService getQueryRunnerExecutorService() {
-    return _queryRunnerExecutorService;
-  }
-
   private static List<ServerPlanRequestContext> 
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
       Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> 
helixPropertyStore,
-      MailboxService<TransferableBlock> mailboxService) {
+      MailboxService<TransferableBlock> mailboxService, long deadlineMs) {
     StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
     Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
         "Server request for V2 engine should only have 1 scan table per 
request.");
@@ -231,7 +246,7 @@ public class QueryRunner {
             
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         requests.add(
             ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap, tableConfig,
-                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.OFFLINE, tableEntry.getValue()));
+                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
@@ -239,7 +254,7 @@ public class QueryRunner {
             
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         requests.add(
             ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap, tableConfig,
-                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.REALTIME, tableEntry.getValue()));
+                schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.REALTIME, tableEntry.getValue(), deadlineMs));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 15ae722142..5e4260368b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -68,6 +68,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
         @Override
         public void runJob() {
           boolean isFinished = false;
+          boolean returnedErrorBlock = false;
           Throwable thrown = null;
           try {
             LOGGER.trace("({}): Executing", operatorChain);
@@ -88,6 +89,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
             } else {
               isFinished = true;
               if (result.isErrorBlock()) {
+                returnedErrorBlock = true;
                 LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
                     result.getDataBlock().getExceptions());
               } else {
@@ -99,11 +101,10 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
             thrown = e;
           } finally {
-            if (isFinished) {
-              closeOpChain(operatorChain);
-            } else if (thrown != null) {
-              // TODO: It would make sense to cancel OpChains if they returned 
an error-block.
+            if (returnedErrorBlock || thrown != null) {
               cancelOpChain(operatorChain, thrown);
+            } else if (isFinished) {
+              closeOpChain(operatorChain);
             }
           }
         }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 0b16a47229..74bd60e10d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -186,4 +186,20 @@ public class MailboxReceiveOperator extends 
MultiStageOperator {
             : TransferableBlockUtils.getEndOfStreamTransferableBlock();
     return block;
   }
+
+  @Override
+  public void close() {
+    super.close();
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 504a0d997b..f4e7976527 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -59,7 +59,8 @@ public class MailboxSendOperator extends MultiStageOperator {
   @VisibleForTesting
   interface BlockExchangeFactory {
     BlockExchange build(MailboxService<TransferableBlock> mailboxService, 
List<MailboxIdentifier> destinations,
-        RelDistribution.Type exchange, KeySelector<Object[], Object[]> 
selector, BlockSplitter splitter);
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> 
selector, BlockSplitter splitter,
+        long deadlineMs);
   }
 
   @VisibleForTesting
@@ -70,10 +71,10 @@ public class MailboxSendOperator extends MultiStageOperator 
{
   public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
-      VirtualServerAddress sendingServer, long jobId, int senderStageId, int 
receiverStageId) {
+      VirtualServerAddress sendingServer, long jobId, int senderStageId, int 
receiverStageId, long deadlineMs) {
     this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, 
exchangeType, keySelector,
         server -> toMailboxId(server, jobId, senderStageId, receiverStageId, 
sendingServer), BlockExchange::getExchange,
-        jobId, senderStageId, receiverStageId, sendingServer);
+        jobId, senderStageId, receiverStageId, sendingServer, deadlineMs);
   }
 
   @VisibleForTesting
@@ -81,7 +82,7 @@ public class MailboxSendOperator extends MultiStageOperator {
       MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> 
receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory 
blockExchangeFactory, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress serverAddress) {
+      int receiverStageId, VirtualServerAddress serverAddress, long 
deadlineMs) {
     super(jobId, senderStageId, serverAddress);
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
 
@@ -111,7 +112,8 @@ public class MailboxSendOperator extends MultiStageOperator 
{
     }
 
     BlockSplitter splitter = TransferableBlockUtils::splitBlock;
-    _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, 
exchangeType, keySelector, splitter);
+    _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, 
exchangeType, keySelector, splitter,
+        deadlineMs);
 
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
         String.format("Exchange type '%s' is not supported yet", 
exchangeType));
@@ -141,9 +143,6 @@ public class MailboxSendOperator extends MultiStageOperator 
{
         transferableBlock = _dataTableBlockBaseOperator.nextBlock();
       }
     } catch (final Exception e) {
-      // ideally, MailboxSendOperator doesn't ever throw an exception because
-      // it will just get swallowed, in this scenario at least we can forward
-      // any upstream exceptions as an error  block
       transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
       try {
         _exchange.send(transferableBlock);
@@ -154,6 +153,18 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     return transferableBlock;
   }
 
+  @Override
+  public void close() {
+    super.close();
+    _exchange.close();
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    _exchange.cancel(t);
+  }
+
   private static JsonMailboxIdentifier toMailboxId(
       VirtualServer destination, long jobId, int senderStageId, int 
receiverStageId, VirtualServerAddress sender) {
     return new JsonMailboxIdentifier(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index e6f131b33e..03d9ee2d23 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -29,6 +29,8 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -36,6 +38,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlock;
  * exchanging data across different servers.
  */
 public abstract class BlockExchange {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockExchange.class);
   // TODO: Deduct this value via grpc config maximum byte size; and make it 
configurable with override.
   // TODO: Max block size is a soft limit. only counts fixedSize datatable 
byte buffer
   private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
@@ -44,10 +47,10 @@ public abstract class BlockExchange {
 
   public static BlockExchange getExchange(MailboxService<TransferableBlock> 
mailboxService,
       List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType, 
KeySelector<Object[], Object[]> selector,
-      BlockSplitter splitter) {
+      BlockSplitter splitter, long deadlineMs) {
     List<SendingMailbox<TransferableBlock>> sendingMailboxes = new 
ArrayList<>();
     for (MailboxIdentifier mid : destinations) {
-      sendingMailboxes.add(mailboxService.getSendingMailbox(mid));
+      sendingMailboxes.add(mailboxService.getSendingMailbox(mid, deadlineMs));
     }
     switch (exchangeType) {
       case SINGLETON:
@@ -71,15 +74,19 @@ public abstract class BlockExchange {
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block) {
+  public void send(TransferableBlock block)
+      throws Exception {
     if (block.isEndOfStreamBlock()) {
-      _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
+      for (SendingMailbox<TransferableBlock> sendingMailbox : 
_sendingMailboxes) {
+        sendBlock(sendingMailbox, block);
+      }
       return;
     }
     route(_sendingMailboxes, block);
   }
 
-  protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, 
TransferableBlock block) {
+  protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, 
TransferableBlock block)
+      throws Exception {
     if (block.isEndOfStreamBlock()) {
       sendingMailbox.send(block);
       sendingMailbox.complete();
@@ -93,5 +100,17 @@ public abstract class BlockExchange {
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> 
destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+  }
+
+  public void cancel(Throwable t) {
+    for (SendingMailbox<TransferableBlock> sendingMailbox : _sendingMailboxes) 
{
+      sendingMailbox.cancel(t);
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index 932f7593b6..e9c44d7502 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -34,7 +34,8 @@ class BroadcastExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block) {
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block)
+      throws Exception {
     for (SendingMailbox mailbox : destinations) {
       sendBlock(mailbox, block);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 99f0e04f91..b7d5a4a15c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -43,7 +43,8 @@ class HashExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block) {
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block)
+      throws Exception {
     List<Object[]>[] destIdxToRows = new List[destinations.size()];
     for (Object[] row : block.getContainer()) {
       int partition = _keySelector.computeHash(row) % destinations.size();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index 0073e28bf0..dc015378ca 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -48,7 +48,8 @@ class RandomExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block) {
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block)
+      throws Exception {
     int destinationIdx = _rand.apply(destinations.size());
     sendBlock(destinations.get(destinationIdx), block);
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 713cfa5ba2..ebe9eb370e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -35,7 +35,8 @@ class SingletonExchange extends BlockExchange {
   }
 
   @Override
-  protected void route(List<SendingMailbox<TransferableBlock>> mailbox, 
TransferableBlock block) {
+  protected void route(List<SendingMailbox<TransferableBlock>> mailbox, 
TransferableBlock block)
+      throws Exception {
     for (SendingMailbox sendingMailbox : mailbox) {
       sendBlock(sendingMailbox, block);
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index dec70bd4e2..8ff1c041d1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -80,7 +80,8 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<MultiStageOperator,
     StageMetadata receivingStageMetadata = 
context.getMetadataMap().get(node.getReceiverStageId());
     return new MailboxSendOperator(context.getMailboxService(), nextOperator,
         receivingStageMetadata.getServerInstances(), node.getExchangeType(), 
node.getPartitionKeySelector(),
-        context.getServer(), context.getRequestId(), node.getStageId(), 
node.getReceiverStageId());
+        context.getServer(), context.getRequestId(), node.getStageId(), 
node.getReceiverStageId(),
+        context.getDeadlineMs());
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index db5b0e028a..dac0f97575 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -33,18 +33,21 @@ public class PlanRequestContext {
   protected final MailboxService<TransferableBlock> _mailboxService;
   protected final long _requestId;
   protected final int _stageId;
+  // TODO: Timeout is not needed since deadline is already present.
   private final long _timeoutMs;
+  private final long _deadlineMs;
   protected final VirtualServerAddress _server;
   protected final Map<Integer, StageMetadata> _metadataMap;
   protected final List<MailboxIdentifier> _receivingMailboxes = new 
ArrayList<>();
 
 
   public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, 
long requestId, int stageId,
-      long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> 
metadataMap) {
+      long timeoutMs, long deadlineMs, VirtualServerAddress server, 
Map<Integer, StageMetadata> metadataMap) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _timeoutMs = timeoutMs;
+    _deadlineMs = deadlineMs;
     _server = server;
     _metadataMap = metadataMap;
   }
@@ -61,6 +64,10 @@ public class PlanRequestContext {
     return _timeoutMs;
   }
 
+  public long getDeadlineMs() {
+    return _deadlineMs;
+  }
+
   public VirtualServerAddress getServer() {
     return _server;
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 55141cc739..b6194bd15b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -91,7 +91,7 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
 
   public static ServerPlanRequestContext 
build(MailboxService<TransferableBlock> mailboxService,
       DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, 
TableConfig tableConfig, Schema schema,
-      TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> 
segmentList) {
+      TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> 
segmentList, long deadlineMs) {
     // Before-visit: construct the ServerPlanRequestContext baseline
     // Making a unique requestId for leaf stages otherwise it causes problem 
on stats/metrics/tracing.
     long requestId = 
(Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) 
<< 16)
@@ -107,7 +107,7 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
     pinotQuery.setExplain(false);
     ServerPlanRequestContext context =
-        new ServerPlanRequestContext(mailboxService, requestId, 
stagePlan.getStageId(), timeoutMs,
+        new ServerPlanRequestContext(mailboxService, requestId, 
stagePlan.getStageId(), timeoutMs, deadlineMs,
             new VirtualServerAddress(stagePlan.getServer()), 
stagePlan.getMetadataMap(), pinotQuery, tableType,
             timeBoundaryInfo);
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 35142fe0cf..4403d35e6d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -42,9 +42,9 @@ public class ServerPlanRequestContext extends 
PlanRequestContext {
   protected InstanceRequest _instanceRequest;
 
   public ServerPlanRequestContext(MailboxService<TransferableBlock> 
mailboxService, long requestId, int stageId,
-      long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> 
metadataMap, PinotQuery pinotQuery,
-      TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
-    super(mailboxService, requestId, stageId, timeoutMs, server, metadataMap);
+      long timeoutMs, long deadlineMs, VirtualServerAddress server, 
Map<Integer, StageMetadata> metadataMap,
+      PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo 
timeBoundaryInfo) {
+    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, 
metadataMap);
     _pinotQuery = pinotQuery;
     _tableType = tableType;
     _timeBoundaryInfo = timeBoundaryInfo;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 9b7d0426de..7d08439601 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -21,12 +21,16 @@ package org.apache.pinot.query.mailbox;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.service.QueryConfig;
@@ -76,13 +80,10 @@ public class GrpcMailboxServiceTest {
   @Test(timeOut = 10_000L)
   public void testHappyPath()
       throws Exception {
+    final long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
-    JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
-        "happypath",
-        new VirtualServerAddress("localhost", 
_mailboxService1.getMailboxPort(), 0),
-        new VirtualServerAddress("localhost", 
_mailboxService2.getMailboxPort(), 0),
-        DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
-    SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
+    JsonMailboxIdentifier mailboxId = createMailboxId("happypath");
+    SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
     _mail2GotData.set(ignored -> gotData.countDown());
@@ -109,14 +110,10 @@ public class GrpcMailboxServiceTest {
   @Test(timeOut = 10_000L)
   public void testGrpcException()
       throws Exception {
+    final long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
-    JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
-        "exception",
-        new VirtualServerAddress("localhost", 
_mailboxService1.getMailboxPort(), 0),
-        new VirtualServerAddress("localhost", 
_mailboxService2.getMailboxPort(), 0),
-        DEFAULT_SENDER_STAGE_ID,
-        DEFAULT_RECEIVER_STAGE_ID);
-    SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId);
+    JsonMailboxIdentifier mailboxId = createMailboxId("exception");
+    SendingMailbox<TransferableBlock> sendingMailbox = 
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
     ReceivingMailbox<TransferableBlock> receivingMailbox = 
_mailboxService2.getReceivingMailbox(mailboxId);
     CountDownLatch gotData = new CountDownLatch(1);
     _mail2GotData.set(ignored -> gotData.countDown());
@@ -134,6 +131,207 @@ public class GrpcMailboxServiceTest {
     Assert.assertFalse(receivedDataBlock.getExceptions().isEmpty());
   }
 
+  /**
+   * When the connection reaches deadline before the EOS block could be sent, 
the receiving mailbox should return a
+   * error block.
+   */
+  @Test
+  public void testGrpcStreamDeadline()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 1_000;
+    JsonMailboxIdentifier mailboxId = createMailboxId("conndeadline");
+
+    GrpcSendingMailbox grpcSendingMailbox =
+        (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, 
deadlineMs);
+    GrpcReceivingMailbox grpcReceivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+    CountDownLatch latch = new CountDownLatch(2);
+    Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+      @Override
+      public void accept(MailboxIdentifier mailboxIdentifier) {
+        latch.countDown();
+      }
+    };
+    _mail2GotData.set(callback);
+
+    // Send 1 normal block.
+    grpcSendingMailbox.send(getTestTransferableBlock());
+
+    // Latch had started with count=2. We don't send any EOS block and instead 
wait for connection deadline to
+    // trigger the next callback. The latch won't await the full wait timeout 
and instead should return immediately
+    // as soon as the deadline is hit and MailboxContentStreamObserver#onError 
is called.
+    Assert.assertTrue(latch.await(4_000, TimeUnit.SECONDS));
+
+    // In case of errors, MailboxContentStreamObserver short-circuits and 
skips returning the normal data-block.
+    TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    Map<Integer, String> exceptions = 
receivedBlock.getDataBlock().getExceptions();
+    Assert.assertTrue(MapUtils.isNotEmpty(exceptions));
+    String exceptionMessage = exceptions.values().iterator().next();
+    Assert.assertTrue(exceptionMessage.contains("CANCELLED"));
+
+    // GrpcReceivingMailbox#cancel shouldn't throw and instead silently 
swallow exception
+    grpcReceivingMailbox.cancel();
+  }
+
+  /**
+   * This test ensures that when the buffer in MailboxContentStreamObserver is 
full:
+   *
+   * 1. The sender is not blocked and can complete successfully.
+   * 2. The gotMail callback is called (bufferSize + 1) times.
+   * 3. The offer to the buffer in MailboxContentStreamObserver times out 
around the time the query deadline is reached.
+   * 4. A error-block is returned by a subsequent {@link 
GrpcReceivingMailbox#receive()} call.
+   */
+  @Test
+  public void testMailboxContentStreamBufferFull()
+      throws Exception {
+    final int bufferSize = 
MailboxContentStreamObserver.DEFAULT_MAX_PENDING_MAILBOX_CONTENT;
+    long queryTimeoutMs = 2_000;
+    long deadlineMs = System.currentTimeMillis() + queryTimeoutMs;
+    int blocksSent = 20;
+    JsonMailboxIdentifier mailboxId = createMailboxId("buffer-full");
+
+    GrpcSendingMailbox grpcSendingMailbox =
+        (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, 
deadlineMs);
+    GrpcReceivingMailbox grpcReceivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+    CountDownLatch bufferSizeLatch = new CountDownLatch(bufferSize);
+    CountDownLatch bufferSizePlusOneLatch = new CountDownLatch(bufferSize + 1);
+    CountDownLatch bufferSizePlusTwoLatch = new CountDownLatch(bufferSize + 2);
+    CountDownLatch bufferSizePlusThreeLatch = new CountDownLatch(bufferSize + 
3);
+    Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+      @Override
+      public void accept(MailboxIdentifier mailboxIdentifier) {
+        bufferSizeLatch.countDown();
+        bufferSizePlusOneLatch.countDown();
+        bufferSizePlusTwoLatch.countDown();
+        bufferSizePlusThreeLatch.countDown();
+      }
+    };
+    _mail2GotData.set(callback);
+
+    // Sending mailbox will not be blocked if receiver buffer is full
+    for (int i = 0; i < blocksSent; i++) {
+      grpcSendingMailbox.send(getTestTransferableBlock());
+    }
+    grpcSendingMailbox.complete();
+
+    // Ensure that the buffer is completely filled
+    Assert.assertTrue(bufferSizeLatch.await(1, TimeUnit.SECONDS));
+    // Wait for the buffer offer to fail. After it fails, gotMail callback 
will be called once more in onNext
+    Assert.assertTrue(bufferSizePlusOneLatch.await(queryTimeoutMs + 1_000, 
TimeUnit.MILLISECONDS));
+    // Since buffer offer fails after the stream deadline has already been 
reached,
+    // MailboxContentStreamObserver#onError will be called
+    Assert.assertTrue(bufferSizePlusTwoLatch.await(1, TimeUnit.SECONDS));
+    // gotMail callback will be called (bufferSize + 1) times from onNext and 
once from onError, for a total of
+    // (bufferSize + 2) calls. The following latch await ensures that the 
callback is never called more than that.
+    Assert.assertFalse(bufferSizePlusThreeLatch.await(1, TimeUnit.SECONDS));
+
+    // Ensure that a error-block is returned by the receiving mailbox.
+    TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    Map<Integer, String> exceptions = 
receivedBlock.getDataBlock().getExceptions();
+    Assert.assertTrue(exceptions.size() > 0);
+    Assert.assertTrue(exceptions.values().iterator().next().contains("Timed 
out offering to the receivingBuffer"));
+  }
+
+  /**
+   * This test ensures that when a stream is cancelled by the receiver, any 
future sends by the sender will throw.
+   */
+  @Test
+  public void testStreamCancellationByReceiver()
+      throws Exception {
+    // set a large deadline
+    long deadlineMs = System.currentTimeMillis() + 120_000;
+    JsonMailboxIdentifier mailboxId = createMailboxId("recv-cancel");
+
+    GrpcSendingMailbox grpcSendingMailbox =
+        (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, 
deadlineMs);
+    GrpcReceivingMailbox grpcReceivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+    CountDownLatch receivedDataLatch = new CountDownLatch(1);
+    Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+      @Override
+      public void accept(MailboxIdentifier mailboxIdentifier) {
+        receivedDataLatch.countDown();
+      }
+    };
+    _mail2GotData.set(callback);
+
+    // Send and receive 1 data block to ensure stream is established
+    grpcSendingMailbox.send(getTestTransferableBlock());
+    Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS));
+    TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+    // Receiver issues a cancellation
+    grpcReceivingMailbox.cancel();
+
+    // Send from sender will now throw. We await a few milliseconds since 
cancellation may have a lag in getting
+    // processed at the other side.
+    CountDownLatch neverEndingLatch = new CountDownLatch(1);
+    try {
+      Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS));
+      grpcSendingMailbox.send(getTestTransferableBlock());
+      Assert.fail("Send call above should have thrown since the stream is 
cancelled");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("Called send when stream is 
already closed"));
+    }
+  }
+
+  @Test
+  public void testStreamCancellationBySender()
+      throws Exception {
+    // set a large deadline
+    long deadlineMs = System.currentTimeMillis() + 120_000;
+    JsonMailboxIdentifier mailboxId = createMailboxId("sender-cancel");
+
+    GrpcSendingMailbox grpcSendingMailbox =
+        (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId, 
deadlineMs);
+    GrpcReceivingMailbox grpcReceivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+    CountDownLatch receivedDataLatch = new CountDownLatch(1);
+    Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+      @Override
+      public void accept(MailboxIdentifier mailboxIdentifier) {
+        receivedDataLatch.countDown();
+      }
+    };
+    _mail2GotData.set(callback);
+
+    // Send and receive 1 data block to ensure stream is established
+    grpcSendingMailbox.send(getTestTransferableBlock());
+    Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS));
+    TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+    // Sender issues a cancellation
+    grpcSendingMailbox.cancel(new RuntimeException("foo"));
+
+    // receiving mailbox should return a error-block
+    CountDownLatch neverEndingLatch = new CountDownLatch(1);
+    Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS));
+    receivedBlock = grpcReceivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+  }
+
+  private JsonMailboxIdentifier createMailboxId(String jobId) {
+    return new JsonMailboxIdentifier(
+        jobId,
+        new VirtualServerAddress("localhost", 
_mailboxService1.getMailboxPort(), 0),
+        new VirtualServerAddress("localhost", 
_mailboxService2.getMailboxPort(), 0),
+        DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+  }
+
   private TransferableBlock getTestTransferableBlock() {
     List<Object[]> rows = new ArrayList<>();
     rows.add(createRow(0, "test_string"));
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 0c1351fd04..78edcec092 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -20,11 +20,13 @@ package org.apache.pinot.query.mailbox;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -33,6 +35,12 @@ public class InMemoryMailboxServiceTest {
 
   private static final int DEFAULT_SENDER_STAGE_ID = 0;
   private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
+  private static final JsonMailboxIdentifier MAILBOX_ID = new 
JsonMailboxIdentifier(
+      String.format("%s_%s", 1234, DEFAULT_RECEIVER_STAGE_ID),
+      new VirtualServerAddress("localhost", 0, 0),
+      new VirtualServerAddress("localhost", 0, 0),
+      DEFAULT_SENDER_STAGE_ID,
+      DEFAULT_RECEIVER_STAGE_ID);
   private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new 
String[]{"foo", "bar"},
       new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, 
DataSchema.ColumnDataType.STRING});
   private static final int NUM_ENTRIES = 5;
@@ -40,13 +48,12 @@ public class InMemoryMailboxServiceTest {
   @Test
   public void testHappyPath()
       throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
-    final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
-        "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 0, 0),
-        DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
     InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) 
mailboxService.getReceivingMailbox(
-        mailboxId);
-    InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox) 
mailboxService.getSendingMailbox(mailboxId);
+        MAILBOX_ID);
+    InMemorySendingMailbox sendingMailbox =
+        (InMemorySendingMailbox) mailboxService.getSendingMailbox(MAILBOX_ID, 
deadlineMs);
 
     // Sends are non-blocking as long as channel capacity is not breached
     for (int i = 0; i < NUM_ENTRIES; i++) {
@@ -77,14 +84,15 @@ public class InMemoryMailboxServiceTest {
    */
   @Test
   public void testNonLocalMailboxId() {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
-    final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+    final JsonMailboxIdentifier nonLocalMailboxId = new JsonMailboxIdentifier(
         "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new 
VirtualServerAddress("localhost", 1, 0),
         DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
 
     // Test getReceivingMailbox
     try {
-      mailboxService.getReceivingMailbox(mailboxId);
+      mailboxService.getReceivingMailbox(nonLocalMailboxId);
       Assert.fail("Method call above should have failed");
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("non-local transport"));
@@ -92,16 +100,109 @@ public class InMemoryMailboxServiceTest {
 
     // Test getSendingMailbox
     try {
-      mailboxService.getSendingMailbox(mailboxId);
+      mailboxService.getSendingMailbox(nonLocalMailboxId, deadlineMs);
       Assert.fail("Method call above should have failed");
     } catch (IllegalStateException e) {
       Assert.assertTrue(e.getMessage().contains("non-local transport"));
     }
   }
 
+  @Test
+  public void testInMemoryStreamCancellationByReceiver()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
+    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+    SendingMailbox<TransferableBlock> sendingMailbox = 
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+    ReceivingMailbox<TransferableBlock> receivingMailbox = 
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+    // Send and receive one data block
+    sendingMailbox.send(getTestTransferableBlock(0, false));
+    TransferableBlock receivedBlock = receivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+    receivingMailbox.cancel();
+
+    // After the stream is cancelled, sender will start seeing errors
+    try {
+      sendingMailbox.send(getTestTransferableBlock(1, false));
+      Assert.fail("Method call above should have failed");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("cancelled InMemory"));
+    }
+
+    // Cancel is idempotent for both sending and receiving mailbox so safe to 
call multiple times
+    receivingMailbox.cancel();
+    sendingMailbox.cancel(new RuntimeException("foo"));
+  }
+
+  @Test
+  public void testInMemoryStreamCancellationBySender()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
+    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+    SendingMailbox<TransferableBlock> sendingMailbox = 
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+    ReceivingMailbox<TransferableBlock> receivingMailbox = 
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+    // Send and receive one data block
+    sendingMailbox.send(getTestTransferableBlock(0, false));
+    TransferableBlock receivedBlock = receivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+    sendingMailbox.cancel(new RuntimeException("foo"));
+
+    // After the stream is cancelled, receiver will get error-blocks
+    receivedBlock = receivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+
+    // Cancel is idempotent for both sending and receiving mailbox so safe to 
call multiple times
+    sendingMailbox.cancel(new RuntimeException("foo"));
+    receivingMailbox.cancel();
+  }
+
+  @Test
+  public void testInMemoryStreamTimeOut()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 1000;
+    InMemoryMailboxService mailboxService = new 
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+    SendingMailbox<TransferableBlock> sendingMailbox = 
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+    ReceivingMailbox<TransferableBlock> receivingMailbox = 
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+    // Send and receive one data block
+    sendingMailbox.send(getTestTransferableBlock(0, false));
+    TransferableBlock receivedBlock = receivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+    CountDownLatch neverEndingLatch = new CountDownLatch(1);
+    Assert.assertFalse(neverEndingLatch.await(1, TimeUnit.SECONDS));
+
+    // Sends for the mailbox will throw
+    try {
+      sendingMailbox.send(getTestTransferableBlock(0, false));
+      Assert.fail("Method call above should have failed");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains("Deadline"));
+    }
+
+    // Receiver will receive error-blocks after stream timeout
+    receivedBlock = receivingMailbox.receive();
+    Assert.assertNotNull(receivedBlock);
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+
+    // Cancel will be a no-op and will not throw.
+    sendingMailbox.cancel(new RuntimeException("foo"));
+    receivingMailbox.cancel();
+  }
+
   private TransferableBlock getTestTransferableBlock(int index, boolean 
isEndOfStream) {
     if (isEndOfStream) {
-      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     }
     List<Object[]> rows = new ArrayList<>(index);
     rows.add(new Object[]{index, "test_data"});
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 01232aa03e..89d5e722b3 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -45,11 +45,11 @@ public class MultiplexingMailboxServiceTest {
     Mockito.doReturn(1000).when(grpcMailboxService).getMailboxPort();
     Mockito.doReturn(1000).when(inMemoryMailboxService).getMailboxPort();
     
Mockito.doReturn(Mockito.mock(InMemorySendingMailbox.class)).when(inMemoryMailboxService).getSendingMailbox(
-        Mockito.any());
+        Mockito.any(), Mockito.anyLong());
     
Mockito.doReturn(Mockito.mock(InMemoryReceivingMailbox.class)).when(inMemoryMailboxService).getReceivingMailbox(
         Mockito.any());
     
Mockito.doReturn(Mockito.mock(GrpcSendingMailbox.class)).when(grpcMailboxService).getSendingMailbox(
-        Mockito.any());
+        Mockito.any(), Mockito.anyLong());
     
Mockito.doReturn(Mockito.mock(GrpcReceivingMailbox.class)).when(grpcMailboxService).getReceivingMailbox(
         Mockito.any());
 
@@ -66,8 +66,8 @@ public class MultiplexingMailboxServiceTest {
     Assert.assertEquals("localhost", multiplexService.getHostname());
     Assert.assertEquals(1000, multiplexService.getMailboxPort());
 
-    Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID) 
instanceof InMemorySendingMailbox);
-    Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID) 
instanceof GrpcSendingMailbox);
+    Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID, -1) 
instanceof InMemorySendingMailbox);
+    Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID, 
-1) instanceof GrpcSendingMailbox);
 
     Assert.assertTrue(multiplexService.getReceivingMailbox(LOCAL_MAILBOX_ID) 
instanceof InMemoryReceivingMailbox);
     
Assert.assertTrue(multiplexService.getReceivingMailbox(NON_LOCAL_MAILBOX_ID) 
instanceof GrpcReceivingMailbox);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index aea548607a..4ba8090e09 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -183,7 +183,7 @@ public class OpChainSchedulerServiceTest {
   }
 
   @Test
-  public void shouldCallCloseOnOperatorsThatReturnErrorBlock()
+  public void shouldCallCancelOnOperatorsThatReturnErrorBlock()
       throws InterruptedException {
     initExecutor(1);
     OpChain opChain = getChain(_operatorA);
@@ -196,7 +196,7 @@ public class OpChainSchedulerServiceTest {
     Mockito.doAnswer(inv -> {
       latch.countDown();
       return null;
-    }).when(_operatorA).close();
+    }).when(_operatorA).cancel(Mockito.any());
 
     scheduler.startAsync().awaitRunning();
     scheduler.register(opChain);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index b916b5cc5e..ea7df49829 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -64,7 +64,8 @@ public class MailboxSendOperatorTest {
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
-    Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any()))
+    Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any(),
+        Mockito.anyLong()))
         .thenReturn(_exchange);
 
     Mockito.when(_server.getHostname()).thenReturn("mock");
@@ -79,13 +80,15 @@ public class MailboxSendOperatorTest {
   }
 
   @Test
-  public void shouldSwallowNoOpBlockFromUpstream() {
+  public void shouldSwallowNoOpBlockFromUpstream()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, 
_input, ImmutableList.of(_server),
         RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", 
DEFAULT_SENDER_STAGE_ID,
             DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, 
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
-        new VirtualServerAddress(_server));
+        new VirtualServerAddress(_server), deadlineMs);
     
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
     // When:
@@ -97,13 +100,15 @@ public class MailboxSendOperatorTest {
   }
 
   @Test
-  public void shouldSendErrorBlock() {
+  public void shouldSendErrorBlock()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, 
_input, ImmutableList.of(_server),
         RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", 
DEFAULT_SENDER_STAGE_ID,
             DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, 
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
-        new VirtualServerAddress(_server));
+        new VirtualServerAddress(_server), deadlineMs);
     TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
     Mockito.when(_input.nextBlock()).thenReturn(errorBlock);
 
@@ -116,13 +121,15 @@ public class MailboxSendOperatorTest {
   }
 
   @Test
-  public void shouldSendErrorBlockWhenInputThrows() {
+  public void shouldSendErrorBlockWhenInputThrows()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, 
_input, ImmutableList.of(_server),
         RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", 
DEFAULT_SENDER_STAGE_ID,
             DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, 
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
-        new VirtualServerAddress(_server));
+        new VirtualServerAddress(_server), deadlineMs);
     Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
     ArgumentCaptor<TransferableBlock> captor = 
ArgumentCaptor.forClass(TransferableBlock.class);
 
@@ -136,13 +143,15 @@ public class MailboxSendOperatorTest {
   }
 
   @Test
-  public void shouldSendEosBlock() {
+  public void shouldSendEosBlock()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, 
_input, ImmutableList.of(_server),
         RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", 
DEFAULT_SENDER_STAGE_ID,
             DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, 
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
-        new VirtualServerAddress(_server));
+        new VirtualServerAddress(_server), deadlineMs);
     TransferableBlock eosBlock = 
TransferableBlockUtils.getEndOfStreamTransferableBlock();
     Mockito.when(_input.nextBlock()).thenReturn(eosBlock);
 
@@ -155,13 +164,15 @@ public class MailboxSendOperatorTest {
   }
 
   @Test
-  public void shouldSendDataBlock() {
+  public void shouldSendDataBlock()
+      throws Exception {
+    long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
     MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, 
_input, ImmutableList.of(_server),
         RelDistribution.Type.HASH_DISTRIBUTED, _selector,
         server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", 
DEFAULT_SENDER_STAGE_ID,
             DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, 
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
-        new VirtualServerAddress(_server));
+        new VirtualServerAddress(_server), deadlineMs);
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new 
DataSchema.ColumnDataType[]{}));
     Mockito.when(_input.nextBlock()).thenReturn(dataBlock)
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 033f010a8c..6afdf89a03 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -58,7 +58,8 @@ public class BlockExchangeTest {
   }
 
   @Test
-  public void shouldSendEosBlockToAllDestinations() {
+  public void shouldSendEosBlockToAllDestinations()
+      throws Exception {
     // Given:
     List<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1, _mailbox2);
     BlockExchange exchange = new TestBlockExchange(destinations);
@@ -78,7 +79,8 @@ public class BlockExchangeTest {
   }
 
   @Test
-  public void shouldSendDataBlocksOnlyToTargetDestination() {
+  public void shouldSendDataBlocksOnlyToTargetDestination()
+      throws Exception {
     // Given:
     List<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1);
     BlockExchange exchange = new TestBlockExchange(destinations);
@@ -97,7 +99,8 @@ public class BlockExchangeTest {
   }
 
   @Test
-  public void shouldSplitBlocks() {
+  public void shouldSplitBlocks()
+      throws Exception {
     // Given:
     List<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1);
 
@@ -138,7 +141,8 @@ public class BlockExchangeTest {
     }
 
     @Override
-    protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block) {
+    protected void route(List<SendingMailbox<TransferableBlock>> destinations, 
TransferableBlock block)
+        throws Exception {
       for (SendingMailbox mailbox : destinations) {
         sendBlock(mailbox, block);
       }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index b39d8e43fb..f815e00b78 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -57,7 +57,8 @@ public class BroadcastExchangeTest {
   }
 
   @Test
-  public void shouldBroadcast() {
+  public void shouldBroadcast()
+      throws Exception {
     // Given:
     ImmutableList<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1, _mailbox2);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 50b8b4bdc9..1e140553ca 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -63,7 +63,8 @@ public class HashExchangeTest {
   }
 
   @Test
-  public void shouldSplitAndRouteBlocksBasedOnPartitionKey() {
+  public void shouldSplitAndRouteBlocksBasedOnPartitionKey()
+      throws Exception {
     // Given:
     TestSelector selector = new TestSelector(Iterators.forArray(2, 0, 1));
     Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(new 
Object[]{0}, new Object[]{1}, new Object[]{2}));
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index ab97ea6196..aca4180174 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -56,7 +56,8 @@ public class RandomExchangeTest {
   }
 
   @Test
-  public void shouldRouteRandomly() {
+  public void shouldRouteRandomly()
+      throws Exception {
     // Given:
     ImmutableList<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1, _mailbox2);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 09855be2a4..3a964494ec 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -54,7 +54,8 @@ public class SingletonExchangeTest {
   }
 
   @Test
-  public void shouldRouteSingleton() {
+  public void shouldRouteSingleton()
+      throws Exception {
     // Given:
     ImmutableList<SendingMailbox<TransferableBlock>> destinations = 
ImmutableList.of(_mailbox1);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to