gortiz commented on code in PR #16915:
URL: https://github.com/apache/pinot/pull/16915#discussion_r2409844037
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +138,29 @@ public boolean send(MseBlock.Eos eosBlock,
List<DataBuffer> serializedStats)
// this may happen when the block exchange is itself used as a sending
mailbox, like when using spools
mailboxIdToSendMetadata = -1;
}
+ RuntimeException firstException = null;
+ int numMailboxes = _sendingMailboxes.size();
for (int i = 0; i < numMailboxes; i++) {
- SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
- List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
+ try {
+ SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+ List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
- sendingMailbox.send(eosBlock, statsToSend);
- sendingMailbox.complete();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ sendingMailbox.send(eosBlock, statsToSend);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ }
+ } catch (IOException | TimeoutException | RuntimeException e) {
+ // We want to try to send EOS to all mailboxes, so we catch the
exception and rethrow it at the end.
+ if (firstException == null) {
+ firstException = new RuntimeException("Failed to send EOS block to
mailbox #" + i, e);
Review Comment:
Changed. The code is ugly now, but it should be correct.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -139,16 +138,29 @@ public boolean send(MseBlock.Eos eosBlock,
List<DataBuffer> serializedStats)
// this may happen when the block exchange is itself used as a sending
mailbox, like when using spools
mailboxIdToSendMetadata = -1;
}
+ RuntimeException firstException = null;
+ int numMailboxes = _sendingMailboxes.size();
for (int i = 0; i < numMailboxes; i++) {
- SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
- List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
+ try {
+ SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+ List<DataBuffer> statsToSend = i == mailboxIdToSendMetadata ?
serializedStats : Collections.emptyList();
- sendingMailbox.send(eosBlock, statsToSend);
- sendingMailbox.complete();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ sendingMailbox.send(eosBlock, statsToSend);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Block sent: {} {} to {}", eosBlock,
System.identityHashCode(eosBlock), sendingMailbox);
+ }
+ } catch (IOException | TimeoutException | RuntimeException e) {
Review Comment:
Right now it is the same, but it is more specific. If in the future we throw
more (or fewer) exceptions, the compiler will fail, which will force us to
verify this code to be sure the newly added exception has to be treated this way
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]