Jackie-Jiang commented on code in PR #11746: URL: https://github.com/apache/pinot/pull/11746#discussion_r1355940162
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java: ########## @@ -49,6 +50,8 @@ public class ReceivingMailbox { // TODO: Revisit if this is the correct way to apply back pressure private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>(); + private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false); Review Comment: (minor) This can be defined as a volatile boolean ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java: ########## @@ -34,12 +34,20 @@ public class MailboxStatusObserver implements StreamObserver<MailboxStatus> { private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY); + private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(); Review Comment: (minor) This can be defined as a volatile boolean ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -118,19 +118,24 @@ public String toExplainString() { protected TransferableBlock getNextBlock() { try { TransferableBlock block = _sourceOperator.nextBlock(); + boolean isEarlyTerminated; if (block.isSuccessfulEndOfStreamBlock()) { // Stats need to be populated here because the block is being sent to the mailbox // and the receiving opChain will not be able to access the stats from the previous opChain TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); sendTransferableBlock(eosBlockWithStats); + // when sending an EOS block already, early termination flag is ignored even if receiver has requested it. + isEarlyTerminated = false; } else { - sendTransferableBlock(block); + isEarlyTerminated = sendTransferableBlock(block); Review Comment: Can be simplified (remove `isEarlyTerminated`) ```suggestion if (sendTransferableBlock(block)) { earlyTerminate(); } ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java: ########## @@ -71,6 +71,16 @@ public void cancel(Throwable t) { cancelRemainingMailboxes(); } + public void earlyTerminate() { + earlyTerminateMailboxes(); + } + + protected void earlyTerminateMailboxes() { Review Comment: Any specific reason why making this a separate method? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java: ########## @@ -70,11 +74,12 @@ public void send(TransferableBlock block) @Override public void complete() { + _isTerminated = true; } @Override public void cancel(Throwable t) { - if (_isTerminated) { + if (_isEarlyTerminated || _isTerminated) { Review Comment: We should not change this. Cancel should be applied even if early terminate is called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org