Jackie-Jiang commented on code in PR #11746: URL: https://github.com/apache/pinot/pull/11746#discussion_r1350741074
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java: ########## @@ -94,7 +98,8 @@ public void onNext(MailboxContent mailboxContent) { break; case EARLY_TERMINATED: LOGGER.debug("Mailbox: {} has been early terminated", mailboxId); - onCompleted(); + _responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId) + .putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, "TRUE").build()); Review Comment: (minor) We usually use lowercase `"true"` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java: ########## @@ -70,6 +72,13 @@ public String getOperatorId() { // Make it protected because we should always call nextBlock() protected abstract TransferableBlock getNextBlock(); + protected void setEarlyTerminate() { Review Comment: Rename it to `earlyTerminate()` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java: ########## @@ -68,25 +67,29 @@ protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter spl _splitter = splitter; } - public void send(TransferableBlock block) + /** + * API to send a block to the destination mailboxes. + * @param block the block to be transferred + * @return true if any of the upstream mailboxes requested EOS (e.g. early termination) + * @throws Exception when sending stream unexpectedly closed. + */ + public boolean send(TransferableBlock block) throws Exception { boolean isEarlyTerminated = true; for (SendingMailbox sendingMailbox : _sendingMailboxes) { - if (!sendingMailbox.isTerminated()) { + if (!sendingMailbox.isEarlyTerminated()) { Review Comment: We can do the `block.isEndOfStreamBlock()` check first, then check if all mailboxes are early terminated ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java: ########## @@ -42,6 +42,10 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - return getMultiConsumer().readBlockBlocking(); + TransferableBlock block = getMultiConsumer().readBlockBlocking(); Review Comment: Please add some comment here explaining this logic ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java: ########## @@ -55,13 +56,15 @@ public void send(TransferableBlock block) switch (status) { case SUCCESS: break; + case CANCELLED: + throw new EarlyTerminationException(String.format("Mailbox: %s already cancelled from upstream", _id)); Review Comment: Use `QueryCancelledException` instead. Also change `MailboxSendOperator` accordingly ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java: ########## @@ -34,12 +34,19 @@ public class MailboxStatusObserver implements StreamObserver<MailboxStatus> { private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY); + private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(); private final AtomicBoolean _finished = new AtomicBoolean(); @Override public void onNext(MailboxStatus mailboxStatus) { - // when received a mailbox status from the receiving end, sending end update the known buffer size available - // so we can make better throughput send judgement. here is a simple example. + // when receiving mailbox receives a data block it will return an updated info of the receiving end status including + // 1. the buffer size available, for back-pressure handling + // 2. status whether there's no need to send any additional data block b/c it considered itself finished. + // -- handle early-terminate EOS request. + if (mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)) { Review Comment: Parse the value ```suggestion if (Boolean.parseBoolean(mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE))) { ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java: ########## @@ -70,6 +72,13 @@ public String getOperatorId() { // Make it protected because we should always call nextBlock() protected abstract TransferableBlock getNextBlock(); + protected void setEarlyTerminate() { + _isEarlyTerminated = true; + for (MultiStageOperator upstreamOperator : getChildOperators()) { Review Comment: (minor) upstream is a little bit confusing, suggest calling it `childOperator` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java: ########## @@ -239,8 +240,10 @@ private void buildBroadcastHashTable() } _currentRowsInHashTable += container.size(); if (_currentRowsInHashTable == _maxRowsInHashTable) { - // Early terminate right table operator. - _rightTableOperator.close(); + // setting only the rightTableOperator to be early terminated. + _rightTableOperator.setEarlyTerminate(); + // pulling one extra early termination message from rightTable. Review Comment: We can simply remove the `break;` and do an extra loop check ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java: ########## @@ -64,19 +64,19 @@ public class AggregateOperator extends MultiStageOperator { new CountAggregationFunction(Collections.singletonList(ExpressionContext.forIdentifier("*")), false); private static final ExpressionContext PLACEHOLDER_IDENTIFIER = ExpressionContext.forIdentifier("__PLACEHOLDER__"); - private final MultiStageOperator _inputOperator; + private final MultiStageOperator _upstreamOperator; Review Comment: This is confusing. In the mailbox, upstream is the next operator (e.g. receiver is upstream for sender), but here upstream is input operator. Suggest reverting this change because input is more clear ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java: ########## @@ -33,7 +34,7 @@ public class InMemorySendingMailbox implements SendingMailbox { private final long _deadlineMs; private ReceivingMailbox _receivingMailbox; - private volatile boolean _isTerminated; + private volatile boolean _isEarlyTerminated; Review Comment: We should still differentiate terminated and early-terminated. When `complete()` or `cancel()` is invoked, the mailbox is terminated. Early-terminated can only be set via the receiving mailbox response ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java: ########## @@ -218,7 +218,8 @@ private void buildBroadcastHashTable() _resourceLimitExceededException = new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE); _resourceLimitExceededException.setMessage( - "Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable); + "Exception occurred when building in-memory hash table for join operator, reach number of rows limit: " Review Comment: (minor) I wouldn't count this as exception, I personally prefer the existing one ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -147,12 +150,12 @@ protected TransferableBlock getNextBlock() { } } - private void sendTransferableBlock(TransferableBlock block) + private boolean sendTransferableBlock(TransferableBlock block) throws Exception { - _exchange.send(block); if (LOGGER.isDebugEnabled()) { LOGGER.debug("==[SEND]== Block " + block + " sent from: " + _context.getId()); } + return _exchange.send(block); Review Comment: (minor) Don't change the order. Cache the result because the logger should log after the block is sent ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java: ########## @@ -87,7 +87,7 @@ public class WindowAggregateOperator extends MultiStageOperator { // List of ranking window functions whose output depends on the ordering of input rows and not on the actual values private static final Set<String> RANKING_FUNCTION_NAMES = ImmutableSet.of("RANK", "DENSE_RANK"); - private final MultiStageOperator _inputOperator; + private final MultiStageOperator _upstreamOperator; Review Comment: Same here, suggest not changing it ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -117,19 +117,22 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { try { - TransferableBlock block = _sourceOperator.nextBlock(); + TransferableBlock block = _upstreamOperator.nextBlock(); + boolean isEarlyTerminated; if (block.isSuccessfulEndOfStreamBlock()) { // Stats need to be populated here because the block is being sent to the mailbox // and the receiving opChain will not be able to access the stats from the previous opChain TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); - sendTransferableBlock(eosBlockWithStats); + isEarlyTerminated = sendTransferableBlock(eosBlockWithStats); Review Comment: We don't need to set early terminate when it is already EOS ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java: ########## @@ -68,25 +67,29 @@ protected BlockExchange(List<SendingMailbox> sendingMailboxes, BlockSplitter spl _splitter = splitter; } - public void send(TransferableBlock block) + /** + * API to send a block to the destination mailboxes. + * @param block the block to be transferred + * @return true if any of the upstream mailboxes requested EOS (e.g. early termination) Review Comment: This comment is incorrect. It returns true only if all receiving mailboxes are early terminated. Side comment: Do you realize this upstream is the opposite of other upstream changes in the PR (in this place it refers to the next operator in the data flow, but in other places it refers to the previous operator in the data flow). Thus suggest not using upstream because it can be confusing ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java: ########## @@ -164,6 +164,9 @@ private void consumeInputBlocks() { _rows.addAll(container.subList(0, _numRowsToKeep - numRows)); LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId, _context.getId()); + setEarlyTerminate(); + // acquire extra metadata block + block = _upstreamOperator.nextBlock(); Review Comment: This is not very robust. Simply remove the `break` and let it follow the regular execution flow ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -56,26 +56,26 @@ public class MailboxSendOperator extends MultiStageOperator { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_SEND"; - private final MultiStageOperator _sourceOperator; + private final MultiStageOperator _upstreamOperator; Review Comment: Suggest not changing this name. `upstream` is different in control flow context and data flow context -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org