This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d49bd3c96e0 Remove SendingMailbox.complete method and add that logic 
when EOS is sent (#16915)
d49bd3c96e0 is described below

commit d49bd3c96e09123918c609c0a2f9a2242576744a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Oct 8 17:52:39 2025 +0200

    Remove SendingMailbox.complete method and add that logic when EOS is sent 
(#16915)
---
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 12 +----
 .../query/mailbox/InMemorySendingMailbox.java      |  6 +--
 .../apache/pinot/query/mailbox/SendingMailbox.java | 26 +++++-----
 .../runtime/operator/exchange/BlockExchange.java   | 58 +++++++++++++---------
 .../pinot/query/mailbox/MailboxServiceTest.java    |  5 --
 .../operator/exchange/BlockExchangeTest.java       |  7 ++-
 6 files changed, 52 insertions(+), 62 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index cce43b228f7..4ffdd58e684 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -98,6 +98,8 @@ public class GrpcSendingMailbox implements SendingMailbox {
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException {
     sendInternal(block, serializedStats);
+    LOGGER.debug("Completing mailbox: {}", _id);
+    _contentObserver.onCompleted();
   }
 
   private void sendInternal(MseBlock block, List<DataBuffer> serializedStats)
@@ -140,16 +142,6 @@ public class GrpcSendingMailbox implements SendingMailbox {
     }
   }
 
-  @Override
-  public void complete() {
-    if (isTerminated()) {
-      LOGGER.debug("Already terminated mailbox: {}", _id);
-      return;
-    }
-    LOGGER.debug("Completing mailbox: {}", _id);
-    _contentObserver.onCompleted();
-  }
-
   @Override
   public void cancel(Throwable t) {
     if (isTerminated()) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 43cfb7bf9fe..b566100f276 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -69,6 +69,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException {
     sendPrivate(block, serializedStats);
+    _isTerminated = true;
   }
 
   private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
@@ -102,11 +103,6 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
     }
   }
 
-  @Override
-  public void complete() {
-    _isTerminated = true;
-  }
-
   @Override
   public void cancel(Throwable t) {
     if (_isTerminated) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index a3f37007670..d11329f112b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -28,6 +28,18 @@ import org.apache.pinot.segment.spi.memory.DataBuffer;
 
 /**
  * Mailbox that's used to send data.
+ *
+ * Usages of this interface should follow the pattern:
+ *
+ * <ol>
+ *   <li>Zero or more calls to {@link #send(MseBlock.Data)}</li>
+ *   <li>Then exactly one of:
+ *     <ul>
+ *       <li>One call to {@link #send(MseBlock.Eos, List)} if the receiver is 
not early terminated</li>
+ *       <li>One call to {@link #cancel(Throwable)} if the sender wants to 
cancel the receiver</li>
+ *     </ul>
+ *   </li>
+ * </ol>
  */
 public interface SendingMailbox {
 
@@ -53,20 +65,6 @@ public interface SendingMailbox {
   void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException;
 
-  /**
-   * Called when there is no more data to be sent by the {@link 
BlockExchange}. This is also a signal for the
-   * SendingMailbox that the sender is done sending data from its end. Note 
that this doesn't mean that the receiver
-   * has received all the data.
-   *
-   * <p>
-   * <b>Note:</b> While this is similar to a close() method that's usually 
provided with objects that hold releasable
-   * resources, the key difference is that a SendingMailbox cannot completely 
release the resources on its end
-   * gracefully, since it would be waiting for the receiver to ack that it has 
received all the data. See
-   * {@link #cancel} which can allow callers to force release the underlying 
resources.
-   * </p>
-   */
-  void complete();
-
   /**
    * Cancels the mailbox and notifies the receiver of the cancellation so that 
it can release the underlying resources.
    * No more blocks can be sent after calling this method.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index ccc170fad27..261c032e965 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -127,7 +127,6 @@ public abstract class BlockExchange {
    */
   public boolean send(MseBlock.Eos eosBlock, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException {
-    int numMailboxes = _sendingMailboxes.size();
     int mailboxIdToSendMetadata;
     if (!serializedStats.isEmpty()) {
       mailboxIdToSendMetadata = _statsIndexChooser.apply(_sendingMailboxes);
@@ -139,14 +138,36 @@ public abstract class BlockExchange {
       // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
       mailboxIdToSendMetadata = -1;
     }
+    Exception firstException = null;
+    int numMailboxes = _sendingMailboxes.size();
     for (int i = 0; i < numMailboxes; i++) {
-      SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
-      List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
+      try {
+        SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
 
-      sendingMailbox.send(eosBlock, statsToSend);
-      sendingMailbox.complete();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        sendingMailbox.send(eosBlock, statsToSend);
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        }
+      } catch (IOException | TimeoutException | RuntimeException e) {
+        // We want to try to send EOS to all mailboxes, so we catch the 
exception and rethrow it at the end.
+        if (firstException == null) {
+          firstException = e;
+        } else {
+          firstException.addSuppressed(e);
+        }
+      }
+    }
+    if (firstException != null) {
+      // This is ugly, but necessary to be sure we throw the right exception, 
which is later caught by the
+      // QueryRunner and handled properly.
+      if (firstException instanceof IOException) {
+        throw (IOException) firstException;
+      } else if (firstException instanceof TimeoutException) {
+        throw (TimeoutException) firstException;
+      } else {
+        Preconditions.checkState(firstException instanceof RuntimeException);
+        throw (RuntimeException) firstException;
       }
     }
     return false;
@@ -218,30 +239,19 @@ public abstract class BlockExchange {
     @Override
     public void send(MseBlock.Data data)
         throws IOException, TimeoutException {
-      sendPrivate(data, Collections.emptyList());
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Exchange mailbox {} echoing data block {} {}", this, 
data, System.identityHashCode(data));
+      }
+      _earlyTerminated = BlockExchange.this.send(data);
     }
 
     @Override
     public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
         throws IOException, TimeoutException {
-      sendPrivate(block, serializedStats);
-    }
-
-    private void sendPrivate(MseBlock block, List<DataBuffer> serializedStats)
-        throws IOException, TimeoutException {
       if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Exchange mailbox {} echoing {} {}", this, block, 
System.identityHashCode(block));
-      }
-      if (block.isData()) {
-        Preconditions.checkArgument(serializedStats.isEmpty(), "Data block 
cannot have stats");
-        _earlyTerminated = BlockExchange.this.send(((MseBlock.Data) block));
-      } else {
-        _earlyTerminated = BlockExchange.this.send(((MseBlock.Eos) block), 
serializedStats);
+        LOGGER.trace("Exchange mailbox {} echoing EOS block {} {}", this, 
block, System.identityHashCode(block));
       }
-    }
-
-    @Override
-    public void complete() {
+      _earlyTerminated = BlockExchange.this.send(block, serializedStats);
       _completed = true;
     }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index b9bd4d3cb4b..f5f1544e31e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -92,7 +92,6 @@ public class MailboxServiceTest {
       sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));
     }
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
-    sendingMailbox.complete();
 
     ReceivingMailbox receivingMailbox = 
_mailboxService1.getReceivingMailbox(mailboxId);
     receivingMailbox.registeredReader(() -> {
@@ -129,7 +128,6 @@ public class MailboxServiceTest {
       sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));
     }
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
-    sendingMailbox.complete();
 
     assertEquals(numCallbacks.get(), 
ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS);
 
@@ -342,7 +340,6 @@ public class MailboxServiceTest {
       sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));
     }
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
-    sendingMailbox.complete();
 
     // Wait until all the mails are delivered
     ReceivingMailbox receivingMailbox = 
_mailboxService1.getReceivingMailbox(mailboxId);
@@ -388,7 +385,6 @@ public class MailboxServiceTest {
       sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{Integer.toString(i)}));
     }
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
-    sendingMailbox.complete();
 
     // Wait until all the mails are delivered
     receiveMailLatch.await();
@@ -669,7 +665,6 @@ public class MailboxServiceTest {
     sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{"0"}));
     // send a metadata block
     sendingMailbox.send(SuccessMseBlock.INSTANCE, 
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
-    sendingMailbox.complete();
 
     // sending side should early terminate
     TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(), 
1000L, "Failed to early-terminate sender");
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index e1fb8ab68eb..4ea75fcad74 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.operator.exchange;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -71,16 +70,16 @@ public class BlockExchangeTest {
     BlockExchange exchange = new TestBlockExchange(destinations);
 
     // When:
-    exchange.send(SuccessMseBlock.INSTANCE, Collections.emptyList());
+    exchange.send(SuccessMseBlock.INSTANCE, List.of());
 
     // Then:
     ArgumentCaptor<MseBlock.Eos> captor = 
ArgumentCaptor.forClass(MseBlock.Eos.class);
 
-    Mockito.verify(_mailbox1).complete();
+    Mockito.verify(_mailbox1).send(SuccessMseBlock.INSTANCE, List.of());
     Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture(), 
anyList());
     Assert.assertTrue(captor.getValue().isEos());
 
-    Mockito.verify(_mailbox2).complete();
+    Mockito.verify(_mailbox2).send(SuccessMseBlock.INSTANCE, List.of());
     Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture(), 
anyList());
     Assert.assertTrue(captor.getValue().isEos());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to