gortiz commented on code in PR #11205: URL: https://github.com/apache/pinot/pull/11205#discussion_r1289877958
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -118,46 +118,52 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - boolean canContinue = true; TransferableBlock transferableBlock; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("==[SEND]== Enter getNextBlock from: " + _context.getId()); + } try { transferableBlock = _sourceOperator.nextBlock(); - if (transferableBlock.isNoOpBlock()) { - return transferableBlock; - } else if (transferableBlock.isEndOfStreamBlock()) { - if (transferableBlock.isSuccessfulEndOfStreamBlock()) { - // Stats need to be populated here because the block is being sent to the mailbox - // and the receiving opChain will not be able to access the stats from the previous opChain - TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( - OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); - sendTransferableBlock(eosBlockWithStats); - } else { - sendTransferableBlock(transferableBlock); - } - } else { // normal blocks - // check whether we should continue depending on exchange queue condition. - canContinue = sendTransferableBlock(transferableBlock); + if (transferableBlock.isSuccessfulEndOfStreamBlock()) { + // Stats need to be populated here because the block is being sent to the mailbox + // and the receiving opChain will not be able to access the stats from the previous opChain + TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( + OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + sendTransferableBlock(eosBlockWithStats, false); + } else { + sendTransferableBlock(transferableBlock, true); } } catch (Exception e) { transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e); try { LOGGER.error("Exception while transferring data on opChain: " + _context.getId(), e); - sendTransferableBlock(transferableBlock); + sendTransferableBlock(transferableBlock, false); } catch (Exception e2) { LOGGER.error("Exception while sending error block.", e2); } } - // yield if we cannot continue to put transferable block into the sending queue - return canContinue ? transferableBlock : TransferableBlockUtils.getNoOpTransferableBlock(); + return transferableBlock; } - private boolean sendTransferableBlock(TransferableBlock block) + private void sendTransferableBlock(TransferableBlock block, boolean throwIfTimeout) Review Comment: The idea here is that sometimes we don't want to do that. Specially when we found an exception in the happy path. There we send the exception downstream as an error block. In case that error blocks times out... what do we want to do? If we decide to throw a timeout there we would hide the original exception. IIRC @walterddr's original code is the one that introduce this _I don't want to throw timeout_ parameter and I though it was a good idea. But we can discuss about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org