Jackie-Jiang commented on code in PR #16915:
URL: https://github.com/apache/pinot/pull/16915#discussion_r2408976983


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -237,14 +249,10 @@ private void sendPrivate(MseBlock block, List<DataBuffer> 
serializedStats)
         _earlyTerminated = BlockExchange.this.send(((MseBlock.Data) block));
       } else {
         _earlyTerminated = BlockExchange.this.send(((MseBlock.Eos) block), 
serializedStats);
+        _completed = true;

Review Comment:
   Should we set it in `public void send(MseBlock.Eos block, List<DataBuffer> 
serializedStats)`?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -68,7 +68,11 @@ public void send(MseBlock.Data data)
   @Override
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats)
       throws IOException, TimeoutException {
-    sendPrivate(block, serializedStats);
+    try {
+      sendPrivate(block, serializedStats);
+    } finally {
+      _isTerminated = true;

Review Comment:
   Should we put this in the `finally` block? This could prevent error block 
being set in `cancel()` when `sendPrivate()` throws exception



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +138,29 @@ public boolean send(MseBlock.Eos eosBlock, 
List<DataBuffer> serializedStats)
       // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
       mailboxIdToSendMetadata = -1;
     }
+    RuntimeException firstException = null;
+    int numMailboxes = _sendingMailboxes.size();
     for (int i = 0; i < numMailboxes; i++) {
-      SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
-      List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
+      try {
+        SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
 
-      sendingMailbox.send(eosBlock, statsToSend);
-      sendingMailbox.complete();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        sendingMailbox.send(eosBlock, statsToSend);
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        }
+      } catch (IOException | TimeoutException | RuntimeException e) {

Review Comment:
   Is this the same as `catch(Exception e)`?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java:
##########
@@ -76,11 +76,11 @@ public void shouldSendEosBlockToAllDestinations()
     // Then:
     ArgumentCaptor<MseBlock.Eos> captor = 
ArgumentCaptor.forClass(MseBlock.Eos.class);
 
-    Mockito.verify(_mailbox1).complete();
+    Mockito.verify(_mailbox1).send(SuccessMseBlock.INSTANCE, 
Collections.emptyList());

Review Comment:
   (nit) Let's use `List.of()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +138,29 @@ public boolean send(MseBlock.Eos eosBlock, 
List<DataBuffer> serializedStats)
       // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
       mailboxIdToSendMetadata = -1;
     }
+    RuntimeException firstException = null;
+    int numMailboxes = _sendingMailboxes.size();
     for (int i = 0; i < numMailboxes; i++) {
-      SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
-      List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
+      try {
+        SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+        List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ? 
serializedStats : Collections.emptyList();
 
-      sendingMailbox.send(eosBlock, statsToSend);
-      sendingMailbox.complete();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        sendingMailbox.send(eosBlock, statsToSend);
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Block sent: {} {} to {}", eosBlock, 
System.identityHashCode(eosBlock), sendingMailbox);
+        }
+      } catch (IOException | TimeoutException | RuntimeException e) {
+        // We want to try to send EOS to all mailboxes, so we catch the 
exception and rethrow it at the end.
+        if (firstException == null) {
+          firstException = new RuntimeException("Failed to send EOS block to 
mailbox #" + i, e);

Review Comment:
   This prevents checked exceptions being thrown. We check exception type in 
`QueryRunner` and handle them differently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to