Copilot commented on code in PR #16915:
URL: https://github.com/apache/pinot/pull/16915#discussion_r2402296568


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +139,28 @@ public boolean send(MseBlock.Eos eosBlock, 
List<DataBuffer> serializedStats)
       // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
       mailboxIdToSendMetadata = -1;
     }
+    RuntimeException firstException = null;
     for (int i = 0; i < numMailboxes; i++) {
-      SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
-      List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
+      try {
+        SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
 
-      sendingMailbox.send(eosBlock, statsToSend);
-      sendingMailbox.complete();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        sendingMailbox.send(eosBlock, statsToSend);
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        }
+      } catch (IOException | TimeoutException e) {
+        // We want to try to send EOS to all mailboxes, so we catch the 
exception and rethrow it at the end.
+        if (firstException == null) {
+          firstException = new RuntimeException("Failed to send EOS block to 
mailbox #" + i, e);
+        } else {
+          firstException.addSuppressed(e);
+        }
       }

Review Comment:
   The exception handling only catches `IOException` and `TimeoutException`, 
but the `send()` method can potentially throw `RuntimeException` (as seen in 
the new completion logic). Any uncaught `RuntimeException` from individual 
mailbox sends will prevent EOS from being sent to remaining mailboxes, which 
violates the stated goal of trying to send EOS to all mailboxes.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -97,7 +97,12 @@ public void send(MseBlock.Data data)
   @Override
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException {
-    sendInternal(block, serializedStats);
+    try {
+      sendInternal(block, serializedStats);
+    } finally {
+      LOGGER.debug("Completing mailbox: {}", _id);
+      _contentObserver.onCompleted();
+    }

Review Comment:
   The `_contentObserver.onCompleted()` will be called even if `sendInternal()` 
throws an exception. This could lead to calling `onCompleted()` on a gRPC 
observer that may already be in an error state, potentially causing undefined 
behavior. Consider checking if the observer is still in a valid state before 
calling `onCompleted()`, or move the completion logic to execute only on 
successful send.
   ```suggestion
       sendInternal(block, serializedStats);
       LOGGER.debug("Completing mailbox: {}", _id);
       _contentObserver.onCompleted();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to