Jackie-Jiang commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1293157495
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -118,46 +118,52 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - boolean canContinue = true; TransferableBlock transferableBlock; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId()); + } try { transferableBlock = _sourceOperator.nextBlock(); - if (transferableBlock.isNoOpBlock()) { - return transferableBlock; - } else if (transferableBlock.isEndOfStreamBlock()) { - if (transferableBlock.isSuccessfulEndOfStreamBlock()) { - // Stats need to be populated here because the block is being sent to the mailbox - // and the receiving opChain will not be able to access the stats from the previous opChain - TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( - OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); - sendTransferableBlock(eosBlockWithStats); - } else { - sendTransferableBlock(transferableBlock); - } - } else { // normal blocks - // check whether we should continue depending on exchange queue condition. - canContinue = sendTransferableBlock(transferableBlock); + if (transferableBlock.isSuccessfulEndOfStreamBlock()) { + // Stats need to be populated here because the block is being sent to the mailbox + // and the receiving opChain will not be able to access the stats from the previous opChain + TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( + OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + sendTransferableBlock(eosBlockWithStats, false); + } else { + sendTransferableBlock(transferableBlock, true); } } catch (Exception e) { transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e); try { LOGGER.error("Exception while transferring data on opChain: " + _context.getId(), e); - sendTransferableBlock(transferableBlock); + sendTransferableBlock(transferableBlock, false); } catch (Exception e2) { LOGGER.error("Exception while sending error block.", e2); } } - // yield if we cannot continue to put transferable block into the sending queue - return canContinue ? transferableBlock : TransferableBlockUtils.getNoOpTransferableBlock(); + return transferableBlock; } - private boolean sendTransferableBlock(TransferableBlock block) + private void sendTransferableBlock(TransferableBlock block, boolean throwIfTimeout) Review Comment: IIUC, if it times out, we want to early terminate the operator, so throwing exception is the fastest option. Can you confirm @walterddr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org