gortiz commented on code in PR #14507: URL: https://github.com/apache/pinot/pull/14507#discussion_r1895514526
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java: ########## @@ -79,8 +79,48 @@ public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator i _exchange = exchangeFactory.apply(_statMap); } + /** + * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}. + * + * In normal cases, where the sender sends data to a single receiver stage, this method just delegates on + * {@link #getBlockExchange(OpChainExecutionContext, int, RelDistribution.Type, List, StatMap, BlockSplitter)}. + * + * In case of a multi-sender node, this method creates a two steps exchange: + * <ol> + * <li>One inner exchange is created for each receiver stage, using the method mentioned above and keeping the + * distribution type specified in the {@link MailboxSendNode}.</li> + * <li>Then, a single outer broadcast exchange is created to fan out the data to all the inner exchanges.</li> + * </ol> + * + * @see BlockExchange#asSendingMailbox(String) + */ + private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, MailboxSendNode node, + StatMap<StatKey> statMap) { + BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock; + if (!node.isMultiSend()) { + // it is guaranteed that there is exactly one receiver stage + int receiverStageId = node.getReceiverStageIds().iterator().next(); + return getBlockExchange(ctx, receiverStageId, node.getDistributionType(), node.getKeys(), statMap, mainSplitter); + } + List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>(); + // The inner splitter is a NO_OP because the outer splitter will take care of splitting the blocks + BlockSplitter innerSplitter = BlockSplitter.NO_OP; Review Comment: Yes, we split the block in the outer `BlockExchangeSendingMailbox`, so the internal ones (the ones that actually send to other stages) don't need to split it again. In future we may think of splitting in different ways if the mailbox is memory (where we don't need to split) or grpc (where it may be useful), in which case we would need to change this code. But given we split in the same way for all mailboxes, we can do it only once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org