yashmayya commented on code in PR #14495:
URL: https://github.com/apache/pinot/pull/14495#discussion_r1849912468


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java:
##########
@@ -126,7 +133,8 @@ public boolean equals(Object o) {
       return false;
     }
     MailboxReceiveNode that = (MailboxReceiveNode) o;
-    return _senderStageId == that._senderStageId && _sort == that._sort && 
_sortedOnSender == that._sortedOnSender
+    return getSenderStageId() == that.getSenderStageId() && 
Objects.equals(_sender, that._sender)

Review Comment:
   Looks like this change is causing all the test cases in `PlanNodeSerDeTest` 
to fail because the sender is not serialized into the plan.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java:
##########
@@ -117,96 +117,120 @@ protected boolean traverseStageBoundary() {
       return true;
     }
 
+    /**
+     * The method that is called by default to handle a node that does not 
have a specific visit method.
+     *
+     * This method can be overridden to provide a default behavior for all 
nodes.
+     *
+     * The returned value of this method is ignored by default
+     */
+    protected T preChildren(PlanNode node, C context) {

Review Comment:
   Why not make this a `void` function then? Doesn't seem like there are any 
future planned use cases for the return value here either?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java:
##########
@@ -28,7 +30,7 @@
 
 
 public class MailboxSendNode extends BasePlanNode {
-  private final int _receiverStageId;
+  private final BitSet _receiverStages;

Review Comment:
   How are you planning to handle the corresponding plan serde changes in a 
compatible manner?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+
+
+/**
+ * EquivalentStageReplacer is used to replace equivalent stages in the query 
plan.
+ *
+ * Given a {@link org.apache.pinot.query.planner.plannode.PlanNode} and a
+ * {@link GroupedStages}, modifies the plan node to replace equivalent stages.
+ *
+ * For each {@link MailboxReceiveNode} in the plan, if the sender is not the 
leader of the group,
+ * replaces the sender with the leader.
+ * The leader is also updated to include the receiver in its list of receivers.
+ */
+public class EquivalentStagesReplacer {
+  private EquivalentStagesReplacer() {
+  }
+
+  /**
+   * Replaces the equivalent stages in the query plan.
+   *
+   * @param root Root plan node
+   * @param equivalentStages Equivalent stages
+   */
+  public static void replaceEquivalentStages(PlanNode root, GroupedStages 
equivalentStages) {
+    root.visit(Replacer.INSTANCE, equivalentStages);
+  }
+
+  private static class Replacer extends 
PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> {

Review Comment:
   Nice, didn't expect the actual plan modification to be so simple and elegant!



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java:
##########
@@ -50,8 +53,42 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, 
List<PlanNode> inputs
     _sort = sort;
   }
 
+  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
+      int receiverStage, PinotRelExchangeType exchangeType,
+      RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
+      @Nullable List<RelFieldCollation> collations, boolean sort) {
+    this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, 
distributionType, keys, prePartitioned,
+        collations, sort);
+  }
+
+  private static BitSet toBitSet(int receiverStage) {
+    BitSet bitSet = new BitSet(receiverStage + 1);
+    bitSet.set(receiverStage);
+    return bitSet;
+  }
+
+  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
+      PinotRelExchangeType exchangeType, RelDistribution.Type 
distributionType, @Nullable List<Integer> keys,
+      boolean prePartitioned, @Nullable List<RelFieldCollation> collations, 
boolean sort) {
+    this(stageId, dataSchema, inputs, null, exchangeType, distributionType, 
keys, prePartitioned, collations, sort);
+  }
+
+  public BitSet getReceiverStages() {
+    Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
+    return (BitSet) _receiverStages.clone();
+  }
+
+  @Deprecated
   public int getReceiverStageId() {
-    return _receiverStageId;
+    Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
+    return _receiverStages.nextSetBit(0);
+  }
+
+  public void addReceiver(MailboxReceiveNode node) {
+    if (_receiverStages.get(node.getStageId())) {
+      throw new IllegalStateException("Receiver already added: " + 
node.getStageId());
+    }
+    _receiverStages.set(node.getStageId());

Review Comment:
   This PR doesn't introduce the issue - but considering that a stage can have 
more than one mailbox receive node, doesn't it seem a little confusing to 
identify a receiver only by its stage ID?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java:
##########
@@ -77,6 +78,8 @@ public abstract SortedSet<MailboxSendNode> 
getGroup(MailboxSendNode stage)
    */
   public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups();
 
+  public abstract Set<MailboxSendNode> getStages();

Review Comment:
   Small doc comment might be useful here to distinguish this method from 
`getLeaders` / `getGroups`.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java:
##########
@@ -150,6 +150,7 @@ public PlanNode visitExchange(ExchangeNode node, Context 
context) {
     // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, 
where MailboxReceiveNode is the leave node
     // of the current PlanFragment, and MailboxSendNode is the root node of 
the next PlanFragment.
     int receiverPlanFragmentId = context._currentPlanFragmentId;
+    int receivedIdInPlan = -1; // TODO: Change this

Review Comment:
   What is this for?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java:
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+
+
+/**
+ * EquivalentStageReplacer is used to replace equivalent stages in the query 
plan.
+ *
+ * Given a {@link org.apache.pinot.query.planner.plannode.PlanNode} and a
+ * {@link GroupedStages}, modifies the plan node to replace equivalent stages.
+ *
+ * For each {@link MailboxReceiveNode} in the plan, if the sender is not the 
leader of the group,
+ * replaces the sender with the leader.
+ * The leader is also updated to include the receiver in its list of receivers.
+ */
+public class EquivalentStagesReplacer {
+  private EquivalentStagesReplacer() {
+  }
+
+  /**
+   * Replaces the equivalent stages in the query plan.
+   *
+   * @param root Root plan node
+   * @param equivalentStages Equivalent stages
+   */
+  public static void replaceEquivalentStages(PlanNode root, GroupedStages 
equivalentStages) {
+    root.visit(Replacer.INSTANCE, equivalentStages);
+  }
+
+  private static class Replacer extends 
PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> {
+    private static final Replacer INSTANCE = new Replacer();
+
+    private Replacer() {
+    }
+
+    @Override
+    public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages 
equivalenceGroups) {
+      MailboxSendNode sender = node.getSender();
+      MailboxSendNode leader = equivalenceGroups.getGroup(sender).first();
+      if (canSubstitute(sender, leader)) {
+        // we don't want to visit the children of the node given it is going 
to be pruned
+        node.setSender(leader);
+        leader.addReceiver(node);
+      } else {
+        visitMailboxSend(leader, equivalenceGroups);
+      }
+      return null;
+    }
+
+    private boolean canSubstitute(MailboxSendNode actualSender, 
MailboxSendNode leader) {
+      return actualSender != leader // we don't need to replace the leader 
with itself
+          // the leader is already sending to this stage. Given we don't have 
the ability to send to multiple
+          // receivers in the same stage, we cannot optimize this case right 
now.
+          // If this case seems to be useful, it can be supported in the 
future.
+          && 
!leader.getReceiverStages().intersects(actualSender.getReceiverStages());
+    }

Review Comment:
   Is this for cases like self joins?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java:
##########
@@ -50,8 +53,42 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, 
List<PlanNode> inputs
     _sort = sort;
   }
 
+  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
+      int receiverStage, PinotRelExchangeType exchangeType,
+      RelDistribution.Type distributionType, @Nullable List<Integer> keys, 
boolean prePartitioned,
+      @Nullable List<RelFieldCollation> collations, boolean sort) {
+    this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, 
distributionType, keys, prePartitioned,
+        collations, sort);
+  }
+
+  private static BitSet toBitSet(int receiverStage) {
+    BitSet bitSet = new BitSet(receiverStage + 1);
+    bitSet.set(receiverStage);
+    return bitSet;
+  }
+
+  public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> 
inputs,
+      PinotRelExchangeType exchangeType, RelDistribution.Type 
distributionType, @Nullable List<Integer> keys,
+      boolean prePartitioned, @Nullable List<RelFieldCollation> collations, 
boolean sort) {
+    this(stageId, dataSchema, inputs, null, exchangeType, distributionType, 
keys, prePartitioned, collations, sort);
+  }
+
+  public BitSet getReceiverStages() {
+    Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set");
+    return (BitSet) _receiverStages.clone();
+  }

Review Comment:
   nit: small doc comment clarifying that modifying the returned bitset doesn't 
affect this node's receiver stages might be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to