This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 98374e87cf7 Change SendingMailbox to be closeable. (#16899)
98374e87cf7 is described below

commit 98374e87cf73cce1a6aa136706d41df611a61f4d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Oct 17 16:55:20 2025 +0200

    Change SendingMailbox to be closeable. (#16899)
---
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 41 ++++++++++++++++++----
 .../query/mailbox/InMemorySendingMailbox.java      | 16 +++++++++
 .../apache/pinot/query/mailbox/SendingMailbox.java |  2 +-
 .../mailbox/channel/MailboxContentObserver.java    |  2 +-
 .../mailbox/channel/MailboxStatusObserver.java     | 23 +++++++++++-
 .../apache/pinot/query/runtime/QueryRunner.java    |  8 ++---
 .../runtime/operator/exchange/BlockExchange.java   | 29 +++++++++++++--
 .../runtime/queries/ResourceBasedQueriesTest.java  |  1 +
 8 files changed, 105 insertions(+), 17 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index bdc9f5253a4..7bb8840c14d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -68,6 +68,8 @@ public class GrpcSendingMailbox implements SendingMailbox {
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
   private final int _maxByteStringSize;
+  /// Indicates whether the sending side has attempted to close the mailbox 
(either via complete() or cancel()).
+  private volatile boolean _senderSideClosed;
 
   private StreamObserver<MailboxContent> _contentObserver;
 
@@ -97,16 +99,21 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
   @Override
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
-    sendInternal(block, serializedStats);
-    LOGGER.debug("Completing mailbox: {}", _id);
-    _contentObserver.onCompleted();
+    if (sendInternal(block, serializedStats)) {
+      LOGGER.debug("Completing mailbox: {}", _id);
+      _contentObserver.onCompleted();
+      _senderSideClosed = true;
+    } else {
+      LOGGER.warn("Trying to send EOS to the already terminated mailbox: {}", 
_id);
+    }
   }
 
-  private void sendInternal(MseBlock block, List<DataBuffer> serializedStats) {
+  /// Tries to send the block to the receiver. Returns true if the block is 
sent, false otherwise.
+  private boolean sendInternal(MseBlock block, List<DataBuffer> 
serializedStats) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
       LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox. 
Skipping sending message {} to: {}",
           block, _id);
-      return;
+      return false;
     }
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[GRPC SEND]== sending message " + block + " to: " + _id);
@@ -122,6 +129,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[GRPC SEND]== message " + block + " sent to: " + _id);
     }
+    return true;
   }
 
   private void processAndSend(MseBlock block, List<DataBuffer> serializedStats)
@@ -166,6 +174,7 @@ public class GrpcSendingMailbox implements SendingMailbox {
       LOGGER.debug("Already terminated mailbox: {}", _id);
       return;
     }
+    _senderSideClosed = true;
     LOGGER.debug("Cancelling mailbox: {}", _id);
     if (_contentObserver == null) {
       _contentObserver = getContentObserver();
@@ -190,7 +199,10 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
   @Override
   public boolean isTerminated() {
-    return _statusObserver.isFinished();
+    // _senderSideClosed is set when the sending side has attempted to close 
the mailbox (either via complete() or
+    // cancel()). But we also need to return true the gRPC status observer has 
observed that the connection is closed
+    // (ie due to timeout)
+    return _senderSideClosed || _statusObserver.isFinished();
   }
 
   private StreamObserver<MailboxContent> getContentObserver() {
@@ -314,4 +326,21 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
     return result;
   }
+
+  @Override
+  public void close()
+      throws Exception {
+    if (!isTerminated()) {
+      String errorMsg = "Closing gPRC mailbox without proper EOS message";
+      RuntimeException ex = new RuntimeException(errorMsg);
+      ex.fillInStackTrace();
+      LOGGER.error(errorMsg, ex);
+      _senderSideClosed = true;
+
+      MseBlock errorBlock = ErrorMseBlock.fromError(QueryErrorCode.INTERNAL, 
errorMsg);
+      if (_contentObserver != null) {
+        processAndSend(errorBlock, List.of());
+      }
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 6d8233568fa..c7602564574 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -105,6 +105,7 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
     if (_isTerminated) {
       return;
     }
+    _isTerminated = true;
     LOGGER.debug("Cancelling mailbox: {}", _id);
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
@@ -128,4 +129,19 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
   public String toString() {
     return "m" + _id;
   }
+
+  @Override
+  public void close() {
+    if (!isTerminated()) {
+      String msg = "Closing in-memory mailbox without proper EOS message";
+      RuntimeException exception = new RuntimeException(msg);
+      exception.fillInStackTrace();
+
+      LOGGER.error(msg, exception);
+      if (_receivingMailbox == null) {
+        _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+      }
+      _receivingMailbox.setErrorBlock(ErrorMseBlock.fromException(exception), 
Collections.emptyList());
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 65a0d69831c..2f5161cd15f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -39,7 +39,7 @@ import org.apache.pinot.segment.spi.memory.DataBuffer;
  *   </li>
  * </ol>
  */
-public interface SendingMailbox {
+public interface SendingMailbox extends AutoCloseable {
 
   /**
    * Returns whether the mailbox is sending data to a local receiver, where 
blocks can be directly passed to the
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 0efcd22020a..99400c0dd2d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -124,7 +124,7 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
 
   @Override
   public void onError(Throwable t) {
-    LOGGER.warn("Error on receiver side", t);
+    LOGGER.warn("Receiving mailbox received an error from sender side", t);
     _mailboxBuffers.clear();
     if (_mailbox != null) {
       String msg = t != null ? t.getMessage() : "Unknown";
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
index 4288f4a087a..259109d3d92 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.mailbox.channel;
 
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -67,7 +68,27 @@ public class MailboxStatusObserver implements 
StreamObserver<MailboxStatus> {
 
   @Override
   public void onError(Throwable t) {
-    LOGGER.warn("Error on sender side", t);
+    boolean skipLog = false;
+    if (t instanceof StatusRuntimeException) {
+      switch (((StatusRuntimeException) t).getStatus().getCode()) {
+        case CANCELLED:
+          // If the receiver cancelled the stream, we should not treat it as 
an error.
+          LOGGER.trace("Sending mailbox stream cancelled by receiving side");
+          skipLog = true;
+          break;
+        case DEADLINE_EXCEEDED:
+          // If the request timeout, we should not treat it as an error.
+          LOGGER.trace("Sending mailbox stream deadline exceeded");
+          skipLog = true;
+          break;
+        default:
+          // Other gRPC errors are treated as errors
+          break;
+      }
+    }
+    if (!skipLog) {
+      LOGGER.warn("Sending mailbox received an error from receiving side", t);
+    }
     _finished.set(true);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 5626cd4d3a0..f6f598bd48a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -325,11 +325,9 @@ public class QueryRunner {
     long deadlineMs = executionContext.getPassiveDeadlineMs();
     for (RoutingInfo routingInfo : routingInfos) {
       String mailboxId = routingInfo.getMailboxId();
-      try {
-        StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
-        SendingMailbox sendingMailbox =
-            _mailboxService.getSendingMailbox(routingInfo.getHostname(), 
routingInfo.getPort(),
-                mailboxId, deadlineMs, statMap);
+      StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
+      try (SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+          routingInfo.getPort(), mailboxId, deadlineMs, statMap)) {
         // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
         //  current stage. We will need to fix this in future, but for now, we 
are sending the error block without
         //  the stats.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index c99c469a90a..02292b180e6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This class contains the shared logic across all different exchange types 
for exchanging data across servers.
  */
-public abstract class BlockExchange {
+public abstract class BlockExchange implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BlockExchange.class);
   // TODO: Deduct this value via grpc config maximum byte size; and make it 
configurable with override.
   // TODO: Max block size is a soft limit. only counts fixedSize datatable 
byte buffer
@@ -176,9 +176,27 @@ public abstract class BlockExchange {
 
   protected abstract void route(List<SendingMailbox> destinations, 
MseBlock.Data block);
 
-  // Called when the OpChain gracefully returns.
-  // TODO: This is a no-op right now.
+  @Override
   public void close() {
+    RuntimeException firstException = null;
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      try {
+        sendingMailbox.close();
+      } catch (Exception e) {
+        if (firstException == null) {
+          if (firstException instanceof RuntimeException) {
+            firstException = (RuntimeException) e;
+          } else {
+            firstException = new RuntimeException(e);
+          }
+        } else {
+          firstException.addSuppressed(e);
+        }
+      }
+    }
+    if (firstException != null) {
+      throw firstException;
+    }
   }
 
   public void cancel(Throwable t) {
@@ -253,5 +271,10 @@ public abstract class BlockExchange {
     public String toString() {
       return "e" + _id;
     }
+
+    @Override
+    public void close() {
+      BlockExchange.this.close();
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 6edba101848..28e81f440b8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -274,6 +274,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     // query pinot
     runQuery(sql, expectErrorMsg, false).ifPresent(queryResult -> {
       try {
+        Assert.assertNull(queryResult.getProcessingException(), "Expected no 
exception");
         compareRowEquals(queryResult.getResultTable(), queryH2(h2Sql), 
keepOutputRowOrder);
       } catch (Exception e) {
         Assert.fail(e.getMessage(), e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to