gortiz commented on code in PR #14507:
URL: https://github.com/apache/pinot/pull/14507#discussion_r1895514526


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -79,8 +79,48 @@ public MailboxSendOperator(OpChainExecutionContext context, 
MultiStageOperator i
     _exchange = exchangeFactory.apply(_statMap);
   }
 
+  /**
+   * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
+   *
+   * In normal cases, where the sender sends data to a single receiver stage, 
this method just delegates on
+   * {@link #getBlockExchange(OpChainExecutionContext, int, 
RelDistribution.Type, List, StatMap, BlockSplitter)}.
+   *
+   * In case of a multi-sender node, this method creates a two steps exchange:
+   * <ol>
+   *   <li>One inner exchange is created for each receiver stage, using the 
method mentioned above and keeping the
+   *   distribution type specified in the {@link MailboxSendNode}.</li>
+   *   <li>Then, a single outer broadcast exchange is created to fan out the 
data to all the inner exchanges.</li>
+   * </ol>
+   *
+   * @see BlockExchange#asSendingMailbox(String)
+   */
+  private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, 
MailboxSendNode node,
+      StatMap<StatKey> statMap) {
+    BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock;
+    if (!node.isMultiSend()) {
+      // it is guaranteed that there is exactly one receiver stage
+      int receiverStageId = node.getReceiverStageIds().iterator().next();
+      return getBlockExchange(ctx, receiverStageId, 
node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
+    }
+    List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
+    // The inner splitter is a NO_OP because the outer splitter will take care 
of splitting the blocks
+    BlockSplitter innerSplitter = BlockSplitter.NO_OP;

Review Comment:
   Yes, we split the block in the outer `BlockExchangeSendingMailbox`, so the 
internal ones (the ones that actually send to other stages) don't need to split 
it again. In future we may think of splitting in different ways if the mailbox 
is memory (where we don't need to split) or grpc (where it may be useful), in 
which case we would need to change this code. But given we split in the same 
way for all mailboxes, we can do it only once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to