Jackie-Jiang commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1293157495


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,46 +118,52 @@ public String toExplainString() {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    boolean canContinue = true;
     TransferableBlock transferableBlock;
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId());
+    }
     try {
       transferableBlock = _sourceOperator.nextBlock();
-      if (transferableBlock.isNoOpBlock()) {
-        return transferableBlock;
-      } else if (transferableBlock.isEndOfStreamBlock()) {
-        if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
-          // Stats need to be populated here because the block is being sent 
to the mailbox
-          // and the receiving opChain will not be able to access the stats 
from the previous opChain
-          TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
-              
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
-          sendTransferableBlock(eosBlockWithStats);
-        } else {
-          sendTransferableBlock(transferableBlock);
-        }
-      } else { // normal blocks
-        // check whether we should continue depending on exchange queue 
condition.
-        canContinue = sendTransferableBlock(transferableBlock);
+      if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+        // Stats need to be populated here because the block is being sent to 
the mailbox
+        // and the receiving opChain will not be able to access the stats from 
the previous opChain
+        TransferableBlock eosBlockWithStats = 
TransferableBlockUtils.getEndOfStreamTransferableBlock(
+            
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+        sendTransferableBlock(eosBlockWithStats, false);
+      } else {
+        sendTransferableBlock(transferableBlock, true);
       }
     } catch (Exception e) {
       transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
       try {
         LOGGER.error("Exception while transferring data on opChain: " + 
_context.getId(), e);
-        sendTransferableBlock(transferableBlock);
+        sendTransferableBlock(transferableBlock, false);
       } catch (Exception e2) {
         LOGGER.error("Exception while sending error block.", e2);
       }
     }
-    // yield if we cannot continue to put transferable block into the sending 
queue
-    return canContinue ? transferableBlock : 
TransferableBlockUtils.getNoOpTransferableBlock();
+    return transferableBlock;
   }
 
-  private boolean sendTransferableBlock(TransferableBlock block)
+  private void sendTransferableBlock(TransferableBlock block, boolean 
throwIfTimeout)

Review Comment:
   IIUC, if it times out, we want to early terminate the operator, so throwing 
exception is the fastest option. Can you confirm @walterddr 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to