yashmayya commented on code in PR #14495: URL: https://github.com/apache/pinot/pull/14495#discussion_r1849912468
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java: ########## @@ -126,7 +133,8 @@ public boolean equals(Object o) { return false; } MailboxReceiveNode that = (MailboxReceiveNode) o; - return _senderStageId == that._senderStageId && _sort == that._sort && _sortedOnSender == that._sortedOnSender + return getSenderStageId() == that.getSenderStageId() && Objects.equals(_sender, that._sender) Review Comment: Looks like this change is causing all the test cases in `PlanNodeSerDeTest` to fail because the sender is not serialized into the plan. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java: ########## @@ -117,96 +117,120 @@ protected boolean traverseStageBoundary() { return true; } + /** + * The method that is called by default to handle a node that does not have a specific visit method. + * + * This method can be overridden to provide a default behavior for all nodes. + * + * The returned value of this method is ignored by default + */ + protected T preChildren(PlanNode node, C context) { Review Comment: Why not make this a `void` function then? Doesn't seem like there are any future planned use cases for the return value here either? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java: ########## @@ -28,7 +30,7 @@ public class MailboxSendNode extends BasePlanNode { - private final int _receiverStageId; + private final BitSet _receiverStages; Review Comment: How are you planning to handle the corresponding plan serde changes in a compatible manner? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java: ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +/** + * EquivalentStageReplacer is used to replace equivalent stages in the query plan. + * + * Given a {@link org.apache.pinot.query.planner.plannode.PlanNode} and a + * {@link GroupedStages}, modifies the plan node to replace equivalent stages. + * + * For each {@link MailboxReceiveNode} in the plan, if the sender is not the leader of the group, + * replaces the sender with the leader. + * The leader is also updated to include the receiver in its list of receivers. + */ +public class EquivalentStagesReplacer { + private EquivalentStagesReplacer() { + } + + /** + * Replaces the equivalent stages in the query plan. + * + * @param root Root plan node + * @param equivalentStages Equivalent stages + */ + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { + root.visit(Replacer.INSTANCE, equivalentStages); + } + + private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> { Review Comment: Nice, didn't expect the actual plan modification to be so simple and elegant! ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java: ########## @@ -50,8 +53,42 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs _sort = sort; } + public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs, + int receiverStage, PinotRelExchangeType exchangeType, + RelDistribution.Type distributionType, @Nullable List<Integer> keys, boolean prePartitioned, + @Nullable List<RelFieldCollation> collations, boolean sort) { + this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, distributionType, keys, prePartitioned, + collations, sort); + } + + private static BitSet toBitSet(int receiverStage) { + BitSet bitSet = new BitSet(receiverStage + 1); + bitSet.set(receiverStage); + return bitSet; + } + + public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs, + PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List<Integer> keys, + boolean prePartitioned, @Nullable List<RelFieldCollation> collations, boolean sort) { + this(stageId, dataSchema, inputs, null, exchangeType, distributionType, keys, prePartitioned, collations, sort); + } + + public BitSet getReceiverStages() { + Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); + return (BitSet) _receiverStages.clone(); + } + + @Deprecated public int getReceiverStageId() { - return _receiverStageId; + Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); + return _receiverStages.nextSetBit(0); + } + + public void addReceiver(MailboxReceiveNode node) { + if (_receiverStages.get(node.getStageId())) { + throw new IllegalStateException("Receiver already added: " + node.getStageId()); + } + _receiverStages.set(node.getStageId()); Review Comment: This PR doesn't introduce the issue - but considering that a stage can have more than one mailbox receive node, doesn't it seem a little confusing to identify a receiver only by its stage ID? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -77,6 +78,8 @@ public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) */ public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + public abstract Set<MailboxSendNode> getStages(); Review Comment: Small doc comment might be useful here to distinguish this method from `getLeaders` / `getGroups`. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java: ########## @@ -150,6 +150,7 @@ public PlanNode visitExchange(ExchangeNode node, Context context) { // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment. int receiverPlanFragmentId = context._currentPlanFragmentId; + int receivedIdInPlan = -1; // TODO: Change this Review Comment: What is this for? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java: ########## @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +/** + * EquivalentStageReplacer is used to replace equivalent stages in the query plan. + * + * Given a {@link org.apache.pinot.query.planner.plannode.PlanNode} and a + * {@link GroupedStages}, modifies the plan node to replace equivalent stages. + * + * For each {@link MailboxReceiveNode} in the plan, if the sender is not the leader of the group, + * replaces the sender with the leader. + * The leader is also updated to include the receiver in its list of receivers. + */ +public class EquivalentStagesReplacer { + private EquivalentStagesReplacer() { + } + + /** + * Replaces the equivalent stages in the query plan. + * + * @param root Root plan node + * @param equivalentStages Equivalent stages + */ + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { + root.visit(Replacer.INSTANCE, equivalentStages); + } + + private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor<Void, GroupedStages> { + private static final Replacer INSTANCE = new Replacer(); + + private Replacer() { + } + + @Override + public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages equivalenceGroups) { + MailboxSendNode sender = node.getSender(); + MailboxSendNode leader = equivalenceGroups.getGroup(sender).first(); + if (canSubstitute(sender, leader)) { + // we don't want to visit the children of the node given it is going to be pruned + node.setSender(leader); + leader.addReceiver(node); + } else { + visitMailboxSend(leader, equivalenceGroups); + } + return null; + } + + private boolean canSubstitute(MailboxSendNode actualSender, MailboxSendNode leader) { + return actualSender != leader // we don't need to replace the leader with itself + // the leader is already sending to this stage. Given we don't have the ability to send to multiple + // receivers in the same stage, we cannot optimize this case right now. + // If this case seems to be useful, it can be supported in the future. + && !leader.getReceiverStages().intersects(actualSender.getReceiverStages()); + } Review Comment: Is this for cases like self joins? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java: ########## @@ -50,8 +53,42 @@ public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs _sort = sort; } + public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs, + int receiverStage, PinotRelExchangeType exchangeType, + RelDistribution.Type distributionType, @Nullable List<Integer> keys, boolean prePartitioned, + @Nullable List<RelFieldCollation> collations, boolean sort) { + this(stageId, dataSchema, inputs, toBitSet(receiverStage), exchangeType, distributionType, keys, prePartitioned, + collations, sort); + } + + private static BitSet toBitSet(int receiverStage) { + BitSet bitSet = new BitSet(receiverStage + 1); + bitSet.set(receiverStage); + return bitSet; + } + + public MailboxSendNode(int stageId, DataSchema dataSchema, List<PlanNode> inputs, + PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List<Integer> keys, + boolean prePartitioned, @Nullable List<RelFieldCollation> collations, boolean sort) { + this(stageId, dataSchema, inputs, null, exchangeType, distributionType, keys, prePartitioned, collations, sort); + } + + public BitSet getReceiverStages() { + Preconditions.checkState(!_receiverStages.isEmpty(), "Receivers not set"); + return (BitSet) _receiverStages.clone(); + } Review Comment: nit: small doc comment clarifying that modifying the returned bitset doesn't affect this node's receiver stages might be useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org