Jackie-Jiang commented on code in PR #11746:
URL: https://github.com/apache/pinot/pull/11746#discussion_r1355940162


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -49,6 +50,8 @@ public class ReceivingMailbox {
   // TODO: Revisit if this is the correct way to apply back pressure
   private final BlockingQueue<TransferableBlock> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
   private final AtomicReference<TransferableBlock> _errorBlock = new 
AtomicReference<>();
+  private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false);

Review Comment:
   (minor) This can be defined as a volatile boolean



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java:
##########
@@ -34,12 +34,20 @@ public class MailboxStatusObserver implements 
StreamObserver<MailboxStatus> {
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
 
   private final AtomicInteger _bufferSize = new 
AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+  private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean();

Review Comment:
   (minor) This can be defined as a volatile boolean



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,19 +118,24 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       TransferableBlock block = _sourceOperator.nextBlock();
+      boolean isEarlyTerminated;
       if (block.isSuccessfulEndOfStreamBlock()) {
         // Stats need to be populated here because the block is being sent to 
the mailbox
         // and the receiving opChain will not be able to access the stats from 
the previous opChain
         TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
             
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
         sendTransferableBlock(eosBlockWithStats);
+        // when sending an EOS block already, early termination flag is 
ignored even if receiver has requested it.
+        isEarlyTerminated = false;
       } else {
-        sendTransferableBlock(block);
+        isEarlyTerminated = sendTransferableBlock(block);

Review Comment:
   Can be simplified (remove `isEarlyTerminated`)
   ```suggestion
           if (sendTransferableBlock(block)) {
             earlyTerminate();
           }
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -71,6 +71,16 @@ public void cancel(Throwable t) {
     cancelRemainingMailboxes();
   }
 
+  public void earlyTerminate() {
+    earlyTerminateMailboxes();
+  }
+
+  protected void earlyTerminateMailboxes() {

Review Comment:
   Any specific reason why making this a separate method?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -70,11 +74,12 @@ public void send(TransferableBlock block)
 
   @Override
   public void complete() {
+    _isTerminated = true;
   }
 
   @Override
   public void cancel(Throwable t) {
-    if (_isTerminated) {
+    if (_isEarlyTerminated || _isTerminated) {

Review Comment:
   We should not change this. Cancel should be applied even if early terminate 
is called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to