yashmayya commented on code in PR #14507:
URL: https://github.com/apache/pinot/pull/14507#discussion_r1895308029


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -84,8 +92,19 @@ public boolean send(TransferableBlock block)
     if (block.isSuccessfulEndOfStreamBlock()) {
       // Send metadata to only one randomly picked mailbox, and empty EOS 
block to other mailboxes
       int numMailboxes = _sendingMailboxes.size();
-      int mailboxIdToSendMetadata = 
ThreadLocalRandom.current().nextInt(numMailboxes);
-      assert block.getQueryStats() != null;
+      int mailboxIdToSendMetadata;
+      if (block.getQueryStats() != null) {
+        mailboxIdToSendMetadata = 
ThreadLocalRandom.current().nextInt(numMailboxes);
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Sending EOS metadata. Only mailbox #{} will get 
stats", mailboxIdToSendMetadata);
+        }
+      } else {
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("Sending EOS metadata. No stat will be sent");
+        }
+        // this may happen when the block exchange is itself used as a sending 
mailbox, like when using spools
+        mailboxIdToSendMetadata = -1;

Review Comment:
   I'm not sure I followed this part - shouldn't the block exchange sending 
mailbox send the query stats to all the mailboxes in the next layer where the 
actual random mailbox picking can happen (as an aside, I didn't quite get the 
purpose of this random mailbox picking logic for query stats either)?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -180,8 +185,12 @@ private E readDroppingSuccessEos() {
       // this is done in order to keep the invariant.
       _lastRead--;
       if (LOGGER.isDebugEnabled()) {
+        String ids = _mailboxes.stream()
+            .map(AsyncStream::getId)
+            .map(Object::toString)
+            .collect(Collectors.joining(","));
         LOGGER.debug("==[RECEIVE]== EOS received : " + _id + " in mailbox: " + 
removed.getId()
-            + " (" + _mailboxes.size() + " mailboxes alive)");
+            + " (" + ids + " mailboxes alive)");

Review Comment:
   ```suggestion
               + " (Mailboxes alive: " + ids + ")");
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java:
##########
@@ -209,22 +209,33 @@ public Set<ColocationKey> 
visitMailboxSend(MailboxSendNode node, GreedyShuffleRe
 
     boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, 
distributionKeys);
     // If receiver is not a join-stage, then we can determine distribution 
type now.
-    if (!context.isJoinStage(node.getReceiverStageId())) {
-      Set<ColocationKey> colocationKeys;
-      if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), 
node.getStageId())) {
-        // Servers are not re-assigned on sender-side. If needed, they are 
re-assigned on the receiver side.
+    boolean sendsToJoin = false;
+    boolean allAreSuperSet = true;
+    for (Integer receiverStageId : node.getReceiverStageIds()) {
+      if (context.isJoinStage(receiverStageId)) {
+        sendsToJoin = true;
+        break;
+      }

Review Comment:
   We're only checking if at least one receiver stage is a join stage here?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -137,4 +165,66 @@ public void cancel(Throwable t) {
       sendingMailbox.cancel(t);
     }
   }
+
+  public SendingMailbox asSendingMailbox(String id) {
+    return new BlockExchangeSendingMailbox(id);
+  }
+
+  /**
+   * A mailbox that sends data blocks to a {@link 
org.apache.pinot.query.runtime.operator.exchange.BlockExchange}.
+   *
+   * BlockExchanges send data to a list of {@link SendingMailbox}es, which are 
responsible for sending the data
+   * to the corresponding {@link ReceivingMailbox}es. This class applies the 
decorator pattern to expose a BlockExchange
+   * as a SendingMailbox, open the possibility of having a BlockExchange as a 
destination for another BlockExchange.
+   *
+   * This is useful for example when a send operator has to send data to more 
than one stage. We need to broadcast the
+   * data to all the stages (the first BlockExchange). Then for each stage, we 
need to send the data to the
+   * corresponding workers (the inner BlockExchange). The inner BlockExchange 
may send data using a different
+   * distribution strategy.
+   *
+   * @see MailboxSendNode#isMultiSend()}
+   */
+  private class BlockExchangeSendingMailbox implements SendingMailbox {

Review Comment:
   Neat pattern 🙂 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -79,8 +79,48 @@ public MailboxSendOperator(OpChainExecutionContext context, 
MultiStageOperator i
     _exchange = exchangeFactory.apply(_statMap);
   }
 
+  /**
+   * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
+   *
+   * In normal cases, where the sender sends data to a single receiver stage, 
this method just delegates on
+   * {@link #getBlockExchange(OpChainExecutionContext, int, 
RelDistribution.Type, List, StatMap, BlockSplitter)}.
+   *
+   * In case of a multi-sender node, this method creates a two steps exchange:
+   * <ol>
+   *   <li>One inner exchange is created for each receiver stage, using the 
method mentioned above and keeping the
+   *   distribution type specified in the {@link MailboxSendNode}.</li>
+   *   <li>Then, a single outer broadcast exchange is created to fan out the 
data to all the inner exchanges.</li>
+   * </ol>
+   *
+   * @see BlockExchange#asSendingMailbox(String)
+   */
+  private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, 
MailboxSendNode node,
+      StatMap<StatKey> statMap) {
+    BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock;
+    if (!node.isMultiSend()) {
+      // it is guaranteed that there is exactly one receiver stage
+      int receiverStageId = node.getReceiverStageIds().iterator().next();
+      return getBlockExchange(ctx, receiverStageId, 
node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
+    }
+    List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
+    // The inner splitter is a NO_OP because the outer splitter will take care 
of splitting the blocks
+    BlockSplitter innerSplitter = BlockSplitter.NO_OP;

Review Comment:
   Ah wait nvm, it's a little confusing but IIUC we're using the actual block 
splitter only in the single outer broadcast exchange to avoid splitting the 
blocks for every single exchange?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -110,10 +129,16 @@ public boolean send(TransferableBlock block)
   }
 
   protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock 
block)
-      throws Exception {
+      throws IOException, TimeoutException {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Sending block: {} {} to {}", block.getType(), 
System.identityHashCode(block), sendingMailbox);

Review Comment:
   Why `identityHashCode` and not the `toString` representation?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -79,8 +79,48 @@ public MailboxSendOperator(OpChainExecutionContext context, 
MultiStageOperator i
     _exchange = exchangeFactory.apply(_statMap);
   }
 
+  /**
+   * Creates a {@link BlockExchange} for the given {@link MailboxSendNode}.
+   *
+   * In normal cases, where the sender sends data to a single receiver stage, 
this method just delegates on
+   * {@link #getBlockExchange(OpChainExecutionContext, int, 
RelDistribution.Type, List, StatMap, BlockSplitter)}.
+   *
+   * In case of a multi-sender node, this method creates a two steps exchange:
+   * <ol>
+   *   <li>One inner exchange is created for each receiver stage, using the 
method mentioned above and keeping the
+   *   distribution type specified in the {@link MailboxSendNode}.</li>
+   *   <li>Then, a single outer broadcast exchange is created to fan out the 
data to all the inner exchanges.</li>
+   * </ol>
+   *
+   * @see BlockExchange#asSendingMailbox(String)
+   */
+  private static BlockExchange getBlockExchange(OpChainExecutionContext ctx, 
MailboxSendNode node,
+      StatMap<StatKey> statMap) {
+    BlockSplitter mainSplitter = TransferableBlockUtils::splitBlock;
+    if (!node.isMultiSend()) {
+      // it is guaranteed that there is exactly one receiver stage
+      int receiverStageId = node.getReceiverStageIds().iterator().next();
+      return getBlockExchange(ctx, receiverStageId, 
node.getDistributionType(), node.getKeys(), statMap, mainSplitter);
+    }
+    List<SendingMailbox> perStageSendingMailboxes = new ArrayList<>();
+    // The inner splitter is a NO_OP because the outer splitter will take care 
of splitting the blocks
+    BlockSplitter innerSplitter = BlockSplitter.NO_OP;

Review Comment:
   The definition of inner and outer (exchanges / block splitters) seems to be 
flipped here versus the doc comment above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to