Jackie-Jiang commented on code in PR #11746:
URL: https://github.com/apache/pinot/pull/11746#discussion_r1350741074


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java:
##########
@@ -94,7 +98,8 @@ public void onNext(MailboxContent mailboxContent) {
           break;
         case EARLY_TERMINATED:
           LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
-          onCompleted();
+          
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
+              
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, 
"TRUE").build());

Review Comment:
   (minor) We usually use lowercase `"true"`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -70,6 +72,13 @@ public String getOperatorId() {
   // Make it protected because we should always call nextBlock()
   protected abstract TransferableBlock getNextBlock();
 
+  protected void setEarlyTerminate() {

Review Comment:
   Rename it to `earlyTerminate()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -68,25 +67,29 @@ protected BlockExchange(List<SendingMailbox> 
sendingMailboxes, BlockSplitter spl
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block)
+  /**
+   * API to send a block to the destination mailboxes.
+   * @param block the block to be transferred
+   * @return true if any of the upstream mailboxes requested EOS (e.g. early 
termination)
+   * @throws Exception when sending stream unexpectedly closed.
+   */
+  public boolean send(TransferableBlock block)
       throws Exception {
     boolean isEarlyTerminated = true;
     for (SendingMailbox sendingMailbox : _sendingMailboxes) {
-      if (!sendingMailbox.isTerminated()) {
+      if (!sendingMailbox.isEarlyTerminated()) {

Review Comment:
   We can do the `block.isEndOfStreamBlock()` check first, then check if all 
mailboxes are early terminated



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -42,6 +42,10 @@ public String toExplainString() {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    return getMultiConsumer().readBlockBlocking();
+    TransferableBlock block = getMultiConsumer().readBlockBlocking();

Review Comment:
   Please add some comment here explaining this logic



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -55,13 +56,15 @@ public void send(TransferableBlock block)
     switch (status) {
       case SUCCESS:
         break;
+      case CANCELLED:
+        throw new EarlyTerminationException(String.format("Mailbox: %s already 
cancelled from upstream", _id));

Review Comment:
   Use `QueryCancelledException` instead. Also change `MailboxSendOperator` 
accordingly



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java:
##########
@@ -34,12 +34,19 @@ public class MailboxStatusObserver implements 
StreamObserver<MailboxStatus> {
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
 
   private final AtomicInteger _bufferSize = new 
AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+  private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean();
   private final AtomicBoolean _finished = new AtomicBoolean();
 
   @Override
   public void onNext(MailboxStatus mailboxStatus) {
-    // when received a mailbox status from the receiving end, sending end 
update the known buffer size available
-    // so we can make better throughput send judgement. here is a simple 
example.
+    // when receiving mailbox receives a data block it will return an updated 
info of the receiving end status including
+    //   1. the buffer size available, for back-pressure handling
+    //   2. status whether there's no need to send any additional data block 
b/c it considered itself finished.
+    // -- handle early-terminate EOS request.
+    if 
(mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE))
 {

Review Comment:
   Parse the value
   ```suggestion
       if 
(Boolean.parseBoolean(mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
 {
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -70,6 +72,13 @@ public String getOperatorId() {
   // Make it protected because we should always call nextBlock()
   protected abstract TransferableBlock getNextBlock();
 
+  protected void setEarlyTerminate() {
+    _isEarlyTerminated = true;
+    for (MultiStageOperator upstreamOperator : getChildOperators()) {

Review Comment:
   (minor) upstream is a little bit confusing, suggest calling it 
`childOperator`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -239,8 +240,10 @@ private void buildBroadcastHashTable()
       }
       _currentRowsInHashTable += container.size();
       if (_currentRowsInHashTable == _maxRowsInHashTable) {
-        // Early terminate right table operator.
-        _rightTableOperator.close();
+        // setting only the rightTableOperator to be early terminated.
+        _rightTableOperator.setEarlyTerminate();
+        // pulling one extra early termination message from rightTable.

Review Comment:
   We can simply remove the `break;` and do an extra loop check



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -64,19 +64,19 @@ public class AggregateOperator extends MultiStageOperator {
       new 
CountAggregationFunction(Collections.singletonList(ExpressionContext.forIdentifier("*")),
 false);
   private static final ExpressionContext PLACEHOLDER_IDENTIFIER = 
ExpressionContext.forIdentifier("__PLACEHOLDER__");
 
-  private final MultiStageOperator _inputOperator;
+  private final MultiStageOperator _upstreamOperator;

Review Comment:
   This is confusing. In the mailbox, upstream is the next operator (e.g. 
receiver is upstream for sender), but here upstream is input operator. Suggest 
reverting this change because input is more clear



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -33,7 +34,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
   private final long _deadlineMs;
 
   private ReceivingMailbox _receivingMailbox;
-  private volatile boolean _isTerminated;
+  private volatile boolean _isEarlyTerminated;

Review Comment:
   We should still differentiate terminated and early-terminated. When 
`complete()` or `cancel()` is invoked, the mailbox is terminated. 
Early-terminated can only be set via the receiving mailbox response



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -218,7 +218,8 @@ private void buildBroadcastHashTable()
         _resourceLimitExceededException =
             new 
ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
         _resourceLimitExceededException.setMessage(
-            "Cannot build in memory hash table for join operator, reach number 
of rows limit: " + _maxRowsInHashTable);
+            "Exception occurred when building in-memory hash table for join 
operator, reach number of rows limit: "

Review Comment:
   (minor) I wouldn't count this as exception, I personally prefer the existing 
one



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -147,12 +150,12 @@ protected TransferableBlock getNextBlock() {
     }
   }
 
-  private void sendTransferableBlock(TransferableBlock block)
+  private boolean sendTransferableBlock(TransferableBlock block)
       throws Exception {
-    _exchange.send(block);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[SEND]== Block " + block + " sent from: " + 
_context.getId());
     }
+    return _exchange.send(block);

Review Comment:
   (minor) Don't change the order. Cache the result because the logger should 
log after the block is sent



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java:
##########
@@ -87,7 +87,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   // List of ranking window functions whose output depends on the ordering of 
input rows and not on the actual values
   private static final Set<String> RANKING_FUNCTION_NAMES = 
ImmutableSet.of("RANK", "DENSE_RANK");
 
-  private final MultiStageOperator _inputOperator;
+  private final MultiStageOperator _upstreamOperator;

Review Comment:
   Same here, suggest not changing it



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -117,19 +117,22 @@ public String toExplainString() {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
-      TransferableBlock block = _sourceOperator.nextBlock();
+      TransferableBlock block = _upstreamOperator.nextBlock();
+      boolean isEarlyTerminated;
       if (block.isSuccessfulEndOfStreamBlock()) {
         // Stats need to be populated here because the block is being sent to 
the mailbox
         // and the receiving opChain will not be able to access the stats from 
the previous opChain
         TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
             
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
-        sendTransferableBlock(eosBlockWithStats);
+        isEarlyTerminated = sendTransferableBlock(eosBlockWithStats);

Review Comment:
   We don't need to set early terminate when it is already EOS



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -68,25 +67,29 @@ protected BlockExchange(List<SendingMailbox> 
sendingMailboxes, BlockSplitter spl
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block)
+  /**
+   * API to send a block to the destination mailboxes.
+   * @param block the block to be transferred
+   * @return true if any of the upstream mailboxes requested EOS (e.g. early 
termination)

Review Comment:
   This comment is incorrect. It returns true only if all receiving mailboxes 
are early terminated.
   
   Side comment: Do you realize this upstream is the opposite of other upstream 
changes in the PR (in this place it refers to the next operator in the data 
flow, but in other places it refers to the previous operator in the data flow). 
Thus suggest not using upstream because it can be confusing



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java:
##########
@@ -164,6 +164,9 @@ private void consumeInputBlocks() {
               _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
               LOGGER.debug("Early terminate at SortOperator - operatorId={}, 
opChainId={}", _operatorId,
                   _context.getId());
+              setEarlyTerminate();
+              // acquire extra metadata block
+              block = _upstreamOperator.nextBlock();

Review Comment:
   This is not very robust. Simply remove the `break` and let it follow the 
regular execution flow



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -56,26 +56,26 @@ public class MailboxSendOperator extends MultiStageOperator 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxSendOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
 
-  private final MultiStageOperator _sourceOperator;
+  private final MultiStageOperator _upstreamOperator;

Review Comment:
   Suggest not changing this name. `upstream` is different in control flow 
context and data flow context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to