This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c360aae1 [multistage] Early terminate SortOperator if there is a 
limit (#11334)
b0c360aae1 is described below

commit b0c360aae186cce8940ce54fffe4532adc7fccfb
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Thu Aug 17 00:41:12 2023 -0700

    [multistage] Early terminate SortOperator if there is a limit (#11334)
---
 .../src/test/resources/queries/JoinPlans.json      | 19 ++++++++
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 46 +++++++++++--------
 .../query/mailbox/InMemorySendingMailbox.java      | 28 +++++++++++-
 .../pinot/query/mailbox/ReceivingMailbox.java      | 28 +++++++-----
 .../apache/pinot/query/mailbox/SendingMailbox.java |  6 +++
 .../mailbox/channel/MailboxContentObserver.java    | 28 +++++++++---
 .../query/runtime/blocks/TransferableBlock.java    |  7 ---
 .../runtime/operator/MailboxSendOperator.java      | 51 ++++++++++++----------
 .../pinot/query/runtime/operator/SortOperator.java | 15 ++++---
 .../runtime/operator/exchange/BlockExchange.java   | 15 ++++++-
 .../pinot/query/mailbox/MailboxServiceTest.java    | 16 ++++---
 11 files changed, 178 insertions(+), 81 deletions(-)

diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json 
b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 94a53aec08..cf8537c16c 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -402,6 +402,25 @@
           "\n            LogicalTableScan(table=[[b]])",
           "\n"
         ]
+      },
+      {
+        "description": "Inner join with limit",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON 
a.col1 = b.col2 LIMIT 100",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[100])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[100])",
+          "\n      LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
+          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$0], ts=[$6])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$1], col3=[$2])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
       }
     ]
   },
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 25cc337bf8..47b76b7e12 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.stub.StreamObserver;
@@ -61,35 +60,47 @@ public class GrpcSendingMailbox implements SendingMailbox {
   @Override
   public void send(TransferableBlock block)
       throws IOException {
+    if (isTerminated()) {
+      return;
+    }
     if (_contentObserver == null) {
       _contentObserver = getContentObserver();
     }
-    Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is 
already closed", _id);
     _contentObserver.onNext(toMailboxContent(block));
   }
 
   @Override
   public void complete() {
+    if (isTerminated()) {
+      return;
+    }
     _contentObserver.onCompleted();
   }
 
   @Override
   public void cancel(Throwable t) {
-    if (!_statusObserver.isFinished()) {
-      LOGGER.debug("Cancelling mailbox: {}", _id);
-      if (_contentObserver == null) {
-        _contentObserver = getContentObserver();
-      }
-      try {
-        // NOTE: DO NOT use onError() because it will terminate the stream, 
and receiver might not get the callback
-        
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
-            new RuntimeException("Cancelled by sender with exception: " + 
t.getMessage(), t))));
-        _contentObserver.onCompleted();
-      } catch (Exception e) {
-        // Exception can be thrown if the stream is already closed, so we 
simply ignore it
-        LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
-      }
+    if (isTerminated()) {
+      return;
+    }
+    LOGGER.debug("Cancelling mailbox: {}", _id);
+    if (_contentObserver == null) {
+      _contentObserver = getContentObserver();
     }
+    try {
+      // NOTE: DO NOT use onError() because it will terminate the stream, and 
receiver might not get the callback
+      
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
+          new RuntimeException("Cancelled by sender with exception: " + 
t.getMessage(), t))));
+      _contentObserver.onCompleted();
+    } catch (Exception e) {
+      // Exception can be thrown if the stream is already closed, so we simply 
ignore it
+      LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
+    }
+  }
+
+  @Override
+  public boolean isTerminated() {
+    // TODO: We cannot differentiate early termination vs stream error
+    return _statusObserver.isFinished();
   }
 
   private StreamObserver<MailboxContent> getContentObserver() {
@@ -102,7 +113,6 @@ public class GrpcSendingMailbox implements SendingMailbox {
     DataBlock dataBlock = block.getDataBlock();
     byte[] bytes = dataBlock.toBytes();
     ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes);
-    return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString)
-        .build();
+    return 
MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString).build();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index fb96d62043..23943a2091 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -32,6 +32,7 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
   private final long _deadlineMs;
 
   private ReceivingMailbox _receivingMailbox;
+  private volatile boolean _isTerminated;
 
   public InMemorySendingMailbox(String id, MailboxService mailboxService, long 
deadlineMs) {
     _id = id;
@@ -41,12 +42,27 @@ public class InMemorySendingMailbox implements 
SendingMailbox {
 
   @Override
   public void send(TransferableBlock block) {
+    if (_isTerminated) {
+      return;
+    }
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
     }
     long timeoutMs = _deadlineMs - System.currentTimeMillis();
-    if (!_receivingMailbox.offer(block, timeoutMs)) {
-      throw new RuntimeException(String.format("Failed to offer block into 
mailbox: %s within: %dms", _id, timeoutMs));
+    ReceivingMailbox.ReceivingMailboxStatus status = 
_receivingMailbox.offer(block, timeoutMs);
+    switch (status) {
+      case SUCCESS:
+        break;
+      case ERROR:
+        throw new RuntimeException(String.format("Mailbox: %s already errored 
out (received error block before)", _id));
+      case TIMEOUT:
+        throw new RuntimeException(
+            String.format("Timed out adding block into mailbox: %s with 
timeout: %dms", _id, timeoutMs));
+      case EARLY_TERMINATED:
+        _isTerminated = true;
+        break;
+      default:
+        throw new IllegalStateException("Unsupported mailbox status: " + 
status);
     }
   }
 
@@ -56,6 +72,9 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
 
   @Override
   public void cancel(Throwable t) {
+    if (_isTerminated) {
+      return;
+    }
     LOGGER.debug("Cancelling mailbox: {}", _id);
     if (_receivingMailbox == null) {
       _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
@@ -63,4 +82,9 @@ public class InMemorySendingMailbox implements SendingMailbox 
{
     
_receivingMailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
         new RuntimeException("Cancelled by sender with exception: " + 
t.getMessage(), t)));
   }
+
+  @Override
+  public boolean isTerminated() {
+    return _isTerminated;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index fcba7c0a3d..64b7c3f202 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -65,37 +65,41 @@ public class ReceivingMailbox {
    * Offers a non-error block into the mailbox within the timeout specified, 
returns whether the block is successfully
    * added. If the block is not added, an error block is added to the mailbox.
    */
-  public boolean offer(TransferableBlock block, long timeoutMs) {
-    if (_errorBlock.get() != null) {
+  public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) 
{
+    TransferableBlock errorBlock = _errorBlock.get();
+    if (errorBlock != null) {
       LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring 
the late block", _id);
-      return false;
+      return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.EARLY_TERMINATED
+          : ReceivingMailboxStatus.ERROR;
     }
-    if (timeoutMs < 0) {
+    if (timeoutMs <= 0) {
       LOGGER.debug("Mailbox: {} is already timed out", _id);
       setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
           new TimeoutException("Timed out while offering data to mailbox: " + 
_id)));
-      return false;
+      return ReceivingMailboxStatus.TIMEOUT;
     }
     try {
       if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
-        if (_errorBlock.get() == null) {
+        errorBlock = _errorBlock.get();
+        if (errorBlock == null) {
           _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
-          return true;
+          return ReceivingMailboxStatus.SUCCESS;
         } else {
           LOGGER.debug("Mailbox: {} is already cancelled or errored out, 
ignoring the late block", _id);
           _blocks.clear();
-          return false;
+          return errorBlock == CANCELLED_ERROR_BLOCK ? 
ReceivingMailboxStatus.EARLY_TERMINATED
+              : ReceivingMailboxStatus.ERROR;
         }
       } else {
         LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", 
_id, timeoutMs);
         setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
             new TimeoutException("Timed out while waiting for receive operator 
to consume data from mailbox: " + _id)));
-        return false;
+        return ReceivingMailboxStatus.TIMEOUT;
       }
     } catch (InterruptedException e) {
       LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
       setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
-      return false;
+      return ReceivingMailboxStatus.ERROR;
     }
   }
 
@@ -133,4 +137,8 @@ public class ReceivingMailbox {
   public int getNumPendingBlocks() {
     return _blocks.size();
   }
+
+  public enum ReceivingMailboxStatus {
+    SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 3a794260b7..68b4f958cd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -55,4 +55,10 @@ public interface SendingMailbox {
    * No more blocks can be sent after calling this method.
    */
   void cancel(Throwable t);
+
+  /**
+   * Returns whether the {@link ReceivingMailbox} is already closed. There is 
no need to send more blocks after the
+   * mailbox is terminated.
+   */
+  boolean isTerminated();
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index d9509d5766..9074d81151 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -77,13 +77,27 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
       }
 
       long timeoutMs = 
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
-      if (_mailbox.offer(block, timeoutMs)) {
-        
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
-            .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
-                Integer.toString(_mailbox.getNumPendingBlocks())).build());
-      } else {
-        LOGGER.warn("Failed to add block into mailbox: {} within timeout: 
{}ms", mailboxId, timeoutMs);
-        cancelStream();
+      ReceivingMailbox.ReceivingMailboxStatus status = _mailbox.offer(block, 
timeoutMs);
+      switch (status) {
+        case SUCCESS:
+          
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
+              .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
+                  Integer.toString(_mailbox.getNumPendingBlocks())).build());
+          break;
+        case ERROR:
+          LOGGER.warn("Mailbox: {} already errored out (received error block 
before)", mailboxId);
+          cancelStream();
+          break;
+        case TIMEOUT:
+          LOGGER.warn("Timed out adding block into mailbox: {} with timeout: 
{}ms", mailboxId, timeoutMs);
+          cancelStream();
+          break;
+        case EARLY_TERMINATED:
+          LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
+          onCompleted();
+          break;
+        default:
+          throw new IllegalStateException("Unsupported mailbox status: " + 
status);
       }
     } catch (Exception e) {
       String errorMessage = "Caught exception while processing blocks for 
mailbox: " + mailboxId;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 64e4dc31ae..3465f94602 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.runtime.blocks;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,12 +46,6 @@ public class TransferableBlock implements Block {
   private List<Object[]> _container;
 
   public TransferableBlock(List<Object[]> container, DataSchema dataSchema, 
DataBlock.Type containerType) {
-    this(container, dataSchema, containerType, false);
-  }
-
-  @VisibleForTesting
-  TransferableBlock(List<Object[]> container, DataSchema dataSchema, 
DataBlock.Type containerType,
-      boolean isErrorBlock) {
     _container = container;
     _dataSchema = dataSchema;
     _type = containerType;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index c5e1c71f5f..78bc02351f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -40,6 +40,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,37 +119,40 @@ public class MailboxSendOperator extends 
MultiStageOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    boolean canContinue = true;
-    TransferableBlock transferableBlock;
     try {
-      transferableBlock = _sourceOperator.nextBlock();
-      if (transferableBlock.isNoOpBlock()) {
-        return transferableBlock;
-      } else if (transferableBlock.isEndOfStreamBlock()) {
-        if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
-          // Stats need to be populated here because the block is being sent 
to the mailbox
-          // and the receiving opChain will not be able to access the stats 
from the previous opChain
-          TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-              
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
-          sendTransferableBlock(eosBlockWithStats);
-        } else {
-          sendTransferableBlock(transferableBlock);
-        }
-      } else { // normal blocks
-        // check whether we should continue depending on exchange queue 
condition.
-        canContinue = sendTransferableBlock(transferableBlock);
+      TransferableBlock block = _sourceOperator.nextBlock();
+      if (block.isNoOpBlock()) {
+        return block;
+      } else if (block.isErrorBlock()) {
+        sendTransferableBlock(block);
+        return block;
+      } else if (block.isSuccessfulEndOfStreamBlock()) {
+        // Stats need to be populated here because the block is being sent to 
the mailbox
+        // and the receiving opChain will not be able to access the stats from 
the previous opChain
+        TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+            
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+        sendTransferableBlock(eosBlockWithStats);
+        return block;
+      } else {
+        // Data block
+        boolean canContinue = sendTransferableBlock(block);
+        // Yield if we cannot continue to put transferable block into the 
sending queue
+        return canContinue ? block : 
TransferableBlockUtils.getNoOpTransferableBlock();
       }
+    } catch (EarlyTerminationException e) {
+      // TODO: Query stats are not sent when opChain is early terminated
+      LOGGER.debug("Early terminating opChain: " + _context.getId());
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
     } catch (Exception e) {
-      transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+      TransferableBlock errorBlock = 
TransferableBlockUtils.getErrorTransferableBlock(e);
       try {
         LOGGER.error("Exception while transferring data on opChain: " + 
_context.getId(), e);
-        sendTransferableBlock(transferableBlock);
+        sendTransferableBlock(errorBlock);
       } catch (Exception e2) {
         LOGGER.error("Exception while sending error block.", e2);
       }
+      return errorBlock;
     }
-    // yield if we cannot continue to put transferable block into the sending 
queue
-    return canContinue ? transferableBlock : 
TransferableBlockUtils.getNoOpTransferableBlock();
   }
 
   private boolean sendTransferableBlock(TransferableBlock block)
@@ -157,7 +161,8 @@ public class MailboxSendOperator extends MultiStageOperator 
{
     if (_exchange.offerBlock(block, timeoutMs)) {
       return _exchange.getRemainingCapacity() > 0;
     } else {
-      throw new TimeoutException("Timeout while offering data block into the 
sending queue.");
+      throw new TimeoutException(
+          String.format("Timed out while offering block into the sending queue 
after %dms", timeoutMs));
     }
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 0f3fef5208..7e0b6f8e4c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -84,7 +84,7 @@ public class SortOperator extends MultiStageOperator {
     // - 'isInputSorted' is set to true indicating that the data was already 
sorted
     if (collationKeys.isEmpty() || isInputSorted) {
       _priorityQueue = null;
-      _rows = new ArrayList<>();
+      _rows = new ArrayList<>(Math.min(defaultHolderCapacity, _numRowsToKeep));
     } else {
       // Use the opposite direction as specified by the collation directions 
since we need the PriorityQueue to decide
       // which elements to keep and which to remove based on the limits.
@@ -160,7 +160,7 @@ public class SortOperator extends MultiStageOperator {
         if (block.isErrorBlock()) {
           _upstreamErrorBlock = block;
           return;
-        } else if (TransferableBlockUtils.isEndOfStream(block)) {
+        } else if (block.isSuccessfulEndOfStreamBlock()) {
           _readyToConstruct = true;
           return;
         }
@@ -168,11 +168,16 @@ public class SortOperator extends MultiStageOperator {
         List<Object[]> container = block.getContainer();
         if (_priorityQueue == null) {
           // TODO: when push-down properly, we shouldn't get more than 
_numRowsToKeep
-          if (_rows.size() <= _numRowsToKeep) {
-            if (_rows.size() + container.size() <= _numRowsToKeep) {
+          int numRows = _rows.size();
+          if (numRows < _numRowsToKeep) {
+            if (numRows + container.size() < _numRowsToKeep) {
               _rows.addAll(container);
             } else {
-              _rows.addAll(container.subList(0, _numRowsToKeep - 
_rows.size()));
+              _rows.addAll(container.subList(0, _numRowsToKeep - numRows));
+              LOGGER.debug("Early terminate at SortOperator - operatorId={}, 
opChainId={}", _operatorId,
+                  _context.getId());
+              _readyToConstruct = true;
+              return;
             }
           }
         } else {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index b7a152df02..e47d9835fa 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,6 +95,16 @@ public abstract class BlockExchange {
 
   public boolean offerBlock(TransferableBlock block, long timeoutMs)
       throws Exception {
+    boolean isEarlyTerminated = true;
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (!sendingMailbox.isTerminated()) {
+        isEarlyTerminated = false;
+        break;
+      }
+    }
+    if (isEarlyTerminated) {
+      throw new EarlyTerminationException();
+    }
     return _queue.offer(block, timeoutMs, TimeUnit.MILLISECONDS);
   }
 
@@ -111,8 +122,8 @@ public abstract class BlockExchange {
       }
       block = _queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
       if (block == null) {
-        block = TransferableBlockUtils.getErrorTransferableBlock(
-            new TimeoutException("Timed out on exchange for opChain: " + 
_opChainId));
+        block = TransferableBlockUtils.getErrorTransferableBlock(new 
TimeoutException(
+            String.format("Timed out polling block for opChain: %s after 
%dms", _opChainId, timeoutMs)));
       } else {
         // Notify that the block exchange can now accept more blocks.
         _callback.accept(_opChainId);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index 0b77996c22..9e372b91b9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -506,13 +506,15 @@ public class MailboxServiceTest {
     Thread.sleep(deadlineMs - System.currentTimeMillis() + 10);
     receiveMailLatch.await();
     assertEquals(numCallbacks.get(), 2);
-    try {
-      sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{1}));
-      fail("Expect exception when sending data after timing out");
-    } catch (Exception e) {
-      // Expected
-    }
-    assertEquals(numCallbacks.get(), 2);
+    // TODO: Currently we cannot differentiate early termination vs stream 
error
+    assertTrue(sendingMailbox.isTerminated());
+//    try {
+//      sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new 
Object[]{1}));
+//      fail("Expect exception when sending data after timing out");
+//    } catch (Exception e) {
+//      // Expected
+//    }
+//    assertEquals(numCallbacks.get(), 2);
 
     // Data blocks will be cleaned up
     ReceivingMailbox receivingMailbox = 
_mailboxService1.getReceivingMailbox(mailboxId);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to