yashmayya commented on code in PR #14296: URL: https://github.com/apache/pinot/pull/14296#discussion_r1816493476
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ParentToChildrenCalculator.java: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +/** + * Utility class to calculate the parent to children mapping for a given plan tree. + */ +public class ParentToChildrenCalculator { + private ParentToChildrenCalculator() { + } + + /** + * Returns an identity map indexed by the parent node, with the value being a set of its <strong>direct</strong> child + * nodes. + */ + public static IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> calculate(MailboxSendNode root) { + Visitor visitor = new Visitor(); + root.getInputs().forEach(node -> node.visit(visitor, root)); + + return visitor._parentToChild; + } + + private static class Visitor extends PlanNodeVisitor.DepthFirstVisitor<Void, MailboxSendNode> { + private IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> _parentToChild = new IdentityHashMap<>(); + + @Override + public Void visitMailboxSend(MailboxSendNode node, MailboxSendNode parent) { + _parentToChild.computeIfAbsent(parent, k -> Collections.newSetFromMap(new IdentityHashMap<>())).add(node); + visitChildren(node, node); // children will be called with the current node as the parent Review Comment: The "parent" `MailboxSendNode` is the one being passed along as the context object? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); Review Comment: Why do we need both these structures, isn't the map sufficient for all the operations? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ParentToChildrenCalculator.java: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +/** + * Utility class to calculate the parent to children mapping for a given plan tree. + */ +public class ParentToChildrenCalculator { + private ParentToChildrenCalculator() { + } + + /** + * Returns an identity map indexed by the parent node, with the value being a set of its <strong>direct</strong> child + * nodes. + */ + public static IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> calculate(MailboxSendNode root) { + Visitor visitor = new Visitor(); + root.getInputs().forEach(node -> node.visit(visitor, root)); + + return visitor._parentToChild; + } + + private static class Visitor extends PlanNodeVisitor.DepthFirstVisitor<Void, MailboxSendNode> { + private IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> _parentToChild = new IdentityHashMap<>(); + + @Override + public Void visitMailboxSend(MailboxSendNode node, MailboxSendNode parent) { + _parentToChild.computeIfAbsent(parent, k -> Collections.newSetFromMap(new IdentityHashMap<>())).add(node); Review Comment: Why do we need to use an identity hash map and a set backed by an identity hash map here? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java: ########## @@ -63,4 +63,150 @@ public interface PlanNodeVisitor<T, C> { T visitExchange(ExchangeNode exchangeNode, C context); T visitExplained(ExplainedNode node, C context); + + /** + * A depth-first visitor that visits all children of a node before visiting the node itself. + * + * The default implementation for each plan node type does nothing but visiting its inputs + * (see {@link #visitChildren(PlanNode, Object)}) and then returning the result of calling + * {@link #defaultCase(PlanNode, Object)}. + * + * Subclasses can override each method to provide custom behavior for each plan node type. + * For example: + * + * <pre> + * public ResultClass visitMailboxSend(MailboxSendNode node, ContextClass context) { + * somethingToDoBeforeChildren(node); + * visitChildren(node, context); + * return somethingToDoAfterChildren(node); + * } + * </pre> + * + * It is not mandatory to override all methods nor to call {@link #visitChildren(PlanNode, Object)} when + * overriding a visit method. + * + * Notice that {@link MailboxReceiveNode} nodes do not have inputs. Instead, they may store the sender node in a + * different field. Whether to visit the sender node when visiting a {@link MailboxReceiveNode} is controlled by + * {@link #traverseStageBoundary()}. + * + * @param <T> + * @param <C> + */ + abstract class DepthFirstVisitor<T, C> implements PlanNodeVisitor<T, C> { + + /** + * Visits all children of a node. + * + * Notice that {@link MailboxReceiveNode} nodes do not have inputs and therefore this method is a no-op for them. + * The default {@link #visitMailboxReceive(MailboxReceiveNode, Object)} implementation will visit the sender node + * if {@link #traverseStageBoundary()} returns true, but if it is overridden, it is up to the implementor to decide + * whether to visit the sender node or not. + */ + protected void visitChildren(PlanNode node, C context) { + for (PlanNode input : node.getInputs()) { + input.visit(this, context); + } + } + + /** + * Whether to visit the sender node (going to another stage) when visiting a {@link MailboxReceiveNode}. + * + * Defaults to true. + */ + protected boolean traverseStageBoundary() { + return true; + } + + /** + * The method that is called by default to handle a node that does not have a specific visit method. + * + * This method can be overridden to provide a default behavior for all nodes. + * + * The returned value of this method is what each default visit method will return. + */ + protected T defaultCase(PlanNode node, C context) { + return null; + } + + @Override + public T visitAggregate(AggregateNode node, C context) { + visitChildren(node, context); + return defaultCase(node, context); + } + + @Override + public T visitFilter(FilterNode node, C context) { + visitChildren(node, context); + return defaultCase(node, context); + } + + @Override + public T visitJoin(JoinNode node, C context) { + visitChildren(node, context); + return defaultCase(node, context); + } + + @Override + public T visitMailboxReceive(MailboxReceiveNode node, C context) { + visitChildren(node, context); Review Comment: I guess this isn't required given https://github.com/apache/pinot/pull/14294? Although I'm curious why we didn't just model the sender node as an input to the receiver node? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java: ########## @@ -108,6 +108,11 @@ public PlanNode withInputs(List<PlanNode> inputs) { _keys, _collations, _sort, _sortedOnSender, _sender); } + public MailboxReceiveNode withSender(MailboxSendNode sender) { Review Comment: Is this being used in a subsequent patch? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java: ########## @@ -130,4 +130,12 @@ public int hashCode() { return Objects.hash(super.hashCode(), _receiverStageId, _exchangeType, _distributionType, _keys, _prePartitioned, _collations, _sort); } + + @Override + public String toString() { + return "MailboxSendNode{" Review Comment: nit: might be useful to also add exchange type, distribution type etc.? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); + + /** + * Adds a new group of equivalent stages. + * + * @param node The stage that will be the only member of the group. + * @return this object + * @throws IllegalArgumentException if the stage was already added. + */ + public Mutable addNewGroup(MailboxSendNode node) { + Preconditions.checkArgument(!containsStage(node), "Stage {} was already added", node.getStageId()); + SortedSet<MailboxSendNode> group = new TreeSet<>(STAGE_COMPARATOR); + group.add(node); + _groups.add(group); + _stageToGroup.put(node, group); + return this; + } + + /** + * Adds a stage to an existing group. + * @param original A stage that is already in the group. + * @param newNode The stage to be added to the group. + * @return this object + */ + public Mutable addToGroup(MailboxSendNode original, MailboxSendNode newNode) { + Preconditions.checkArgument(!containsStage(newNode), "Stage {} was already added", newNode.getStageId()); + SortedSet<MailboxSendNode> group = getGroup(original); + group.add(newNode); + _stageToGroup.put(newNode, group); + return this; + } + + @Override + public SortedSet<MailboxSendNode> getLeaders() { + return _groups.stream() + .map(SortedSet::first) + .collect(Collectors.toCollection(() -> new TreeSet<>(STAGE_COMPARATOR))); + } + + @Override + public SortedSet<SortedSet<MailboxSendNode>> getGroups() { + return _groups; + } + + @Override + public boolean containsStage(MailboxSendNode stage) { + return _stageToGroup.containsKey(stage); + } + + @Override + public SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException { + SortedSet<MailboxSendNode> group = _stageToGroup.get(stage); + if (group == null) { + throw new NoSuchElementException("Stage " + stage.getStageId() + " is unknown by this class"); + } + return group; + } + + public Mutable removeStage(MailboxSendNode stage) { Review Comment: When would we need to remove a stage from the set of grouped stages? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java: ########## @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Objects; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.ExplainedNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This utility class can be used to find equivalent stages in the query plan. + * + * Equivalent stages are stages that represent the same job to be done. These stages can be potentially optimized to + * execute that job only once in a special stage that broadcast the results to all the equivalent stages. + */ +public class EquivalentStagesFinder { + public static final Logger LOGGER = LoggerFactory.getLogger(EquivalentStagesFinder.class); + + private EquivalentStagesFinder() { + } + + public static GroupedStages findEquivalentStages(MailboxSendNode root) { + Visitor visitor = new Visitor(); + root.visit(visitor, null); + + return visitor._equivalentStages; + } + + /** + * A visitor that iterates the plan tree and finds equivalent stages. + * + * It may be a bit confusing that this class, which ends up being a visitor, calls another visitor to compare nodes. + * The reason is that this object implements visitor to iterate the plan tree in pre-order. Then for each + * mailbox send node (which are always the root of a stage), it calls + * {@link NodeEquivalence#areEquivalent(MailboxSendNode, MailboxSendNode)}. NodeEquivalence is another class that + * implements visitor, but this time to compare two nodes. + */ + private static class Visitor extends PlanNodeVisitor.DepthFirstVisitor<Void, Void> { + private final GroupedStages.Mutable _equivalentStages = new GroupedStages.Mutable(); + private final NodeEquivalence _nodeEquivalence = new NodeEquivalence(); + + @Override + public Void visitMailboxSend(MailboxSendNode node, Void context) { + // It is important to visit children before doing anything. + // This is a requirement on NodeEquivalence.areEquivalent() method that reduce the complexity of the algorithm + // from O(n^3) to O(n^2). + visitChildren(node, context); + + // first try to find if the current node/stage is equivalent to an already equivalent stages. + for (MailboxSendNode uniqueStage : _equivalentStages.getLeaders()) { + if (_nodeEquivalence.areEquivalent(node, uniqueStage)) { + _equivalentStages.addToGroup(uniqueStage, node); + return null; + } + } + // there is no visited stage that is equivalent to the current stage, so add it to the unique visited stages. + _equivalentStages.addNewGroup(node); + return null; + } + + /** + * A visitor that compares two nodes to see if they are equivalent. + * + * The implementation uses the already visited stages (stored in {@link #_equivalentStages}) to avoid comparing the + * same nodes multiple times. The side effect of that is that the second argument for {@link #areEquivalent} must be + * a node that was already visited. + */ + private class NodeEquivalence implements PlanNodeVisitor<Boolean, PlanNode> { + + /** + * Returns whether the given stage is equivalent to the visited stage. + * <p> + * This method assumes that all sub-stages of an already visited stage are also already visited. + * + * @param stage the stage we want to know if it is equivalent to the visited stage. This stage may or may + * not be already visited. + * @param visitedStage the stage we want to compare the given stage with. This stage must be already visited. + */ + public boolean areEquivalent(MailboxSendNode stage, MailboxSendNode visitedStage) { + Preconditions.checkState( + _equivalentStages.containsStage(visitedStage), "Node {} was not visited yet", visitedStage); + return stage.visit(this, visitedStage); + } + + /** + * This method apply the common equivalence checks that apply for all nodes. + * + * @return true if the nodes are equivalent taking into account the common equivalence checks (ie inputs, hints, + * data schema, etc). + */ + private boolean baseNode(PlanNode node1, PlanNode node2) { Review Comment: ```suggestion private boolean areBaseNodesEquivalent(PlanNode node1, PlanNode node2) { ``` nit: IMO `baseNode` by itself doesn't convey what the method is doing. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java: ########## @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Objects; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.ExplainedNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This utility class can be used to find equivalent stages in the query plan. + * + * Equivalent stages are stages that represent the same job to be done. These stages can be potentially optimized to + * execute that job only once in a special stage that broadcast the results to all the equivalent stages. + */ +public class EquivalentStagesFinder { + public static final Logger LOGGER = LoggerFactory.getLogger(EquivalentStagesFinder.class); + + private EquivalentStagesFinder() { + } + + public static GroupedStages findEquivalentStages(MailboxSendNode root) { Review Comment: Will this be called with the mailbox that sends data to the broker's receiving mailbox in stage 0? Trying to better visualize the flow of the equivalence algorithm here - every time we visit a mailbox send node in the depth first visitor here, we'll check whether it is equivalent to every other known mailbox send node (or at least representatives of the equivalence groups computed so far) and in the equivalence check we visit all the inputs and even traverse mailbox / stage boundaries right? And since this is a depth first visitor, the lower (in the tree, but with a higher stage ID) mailbox send nodes will already have been visited so we should directly be able to check whether those children mailbox send nodes belong to the same equivalence group or not - is this understanding correct? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { Review Comment: I'm not sure I follow the rationale for the design choice here - why is the static nested class `Mutable` separated from the parent abstract class? This doesn't seem generic enough to have any alternate future implementations unless I'm missing something here? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ParentToChildrenCalculator.java: ########## @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +/** + * Utility class to calculate the parent to children mapping for a given plan tree. + */ +public class ParentToChildrenCalculator { + private ParentToChildrenCalculator() { + } + + /** + * Returns an identity map indexed by the parent node, with the value being a set of its <strong>direct</strong> child + * nodes. + */ + public static IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> calculate(MailboxSendNode root) { + Visitor visitor = new Visitor(); + root.getInputs().forEach(node -> node.visit(visitor, root)); + + return visitor._parentToChild; + } + + private static class Visitor extends PlanNodeVisitor.DepthFirstVisitor<Void, MailboxSendNode> { + private IdentityHashMap<MailboxSendNode, Set<MailboxSendNode>> _parentToChild = new IdentityHashMap<>(); + + @Override + public Void visitMailboxSend(MailboxSendNode node, MailboxSendNode parent) { + _parentToChild.computeIfAbsent(parent, k -> Collections.newSetFromMap(new IdentityHashMap<>())).add(node); + visitChildren(node, node); // children will be called with the current node as the parent Review Comment: If I understand correctly, we're trying to collect all the `MailboxSendNode `s and associate each with the set of `MailboxSendNode`s in the immediate lower stages? How and where exactly is this going to be used in subsequent patches? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java: ########## @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Objects; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.ExplainedNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This utility class can be used to find equivalent stages in the query plan. + * + * Equivalent stages are stages that represent the same job to be done. These stages can be potentially optimized to + * execute that job only once in a special stage that broadcast the results to all the equivalent stages. + */ +public class EquivalentStagesFinder { + public static final Logger LOGGER = LoggerFactory.getLogger(EquivalentStagesFinder.class); + + private EquivalentStagesFinder() { + } + + public static GroupedStages findEquivalentStages(MailboxSendNode root) { + Visitor visitor = new Visitor(); + root.visit(visitor, null); + + return visitor._equivalentStages; + } + + /** + * A visitor that iterates the plan tree and finds equivalent stages. + * + * It may be a bit confusing that this class, which ends up being a visitor, calls another visitor to compare nodes. + * The reason is that this object implements visitor to iterate the plan tree in pre-order. Then for each + * mailbox send node (which are always the root of a stage), it calls + * {@link NodeEquivalence#areEquivalent(MailboxSendNode, MailboxSendNode)}. NodeEquivalence is another class that + * implements visitor, but this time to compare two nodes. + */ + private static class Visitor extends PlanNodeVisitor.DepthFirstVisitor<Void, Void> { + private final GroupedStages.Mutable _equivalentStages = new GroupedStages.Mutable(); + private final NodeEquivalence _nodeEquivalence = new NodeEquivalence(); + + @Override + public Void visitMailboxSend(MailboxSendNode node, Void context) { + // It is important to visit children before doing anything. + // This is a requirement on NodeEquivalence.areEquivalent() method that reduce the complexity of the algorithm + // from O(n^3) to O(n^2). + visitChildren(node, context); + + // first try to find if the current node/stage is equivalent to an already equivalent stages. + for (MailboxSendNode uniqueStage : _equivalentStages.getLeaders()) { + if (_nodeEquivalence.areEquivalent(node, uniqueStage)) { + _equivalentStages.addToGroup(uniqueStage, node); + return null; + } + } + // there is no visited stage that is equivalent to the current stage, so add it to the unique visited stages. + _equivalentStages.addNewGroup(node); + return null; + } + + /** + * A visitor that compares two nodes to see if they are equivalent. + * + * The implementation uses the already visited stages (stored in {@link #_equivalentStages}) to avoid comparing the + * same nodes multiple times. The side effect of that is that the second argument for {@link #areEquivalent} must be + * a node that was already visited. + */ + private class NodeEquivalence implements PlanNodeVisitor<Boolean, PlanNode> { + + /** + * Returns whether the given stage is equivalent to the visited stage. + * <p> + * This method assumes that all sub-stages of an already visited stage are also already visited. + * + * @param stage the stage we want to know if it is equivalent to the visited stage. This stage may or may + * not be already visited. + * @param visitedStage the stage we want to compare the given stage with. This stage must be already visited. + */ + public boolean areEquivalent(MailboxSendNode stage, MailboxSendNode visitedStage) { + Preconditions.checkState( + _equivalentStages.containsStage(visitedStage), "Node {} was not visited yet", visitedStage); + return stage.visit(this, visitedStage); + } + + /** + * This method apply the common equivalence checks that apply for all nodes. + * + * @return true if the nodes are equivalent taking into account the common equivalence checks (ie inputs, hints, + * data schema, etc). + */ + private boolean baseNode(PlanNode node1, PlanNode node2) { + // TODO: DataSchema equality checks enforce order between columns. This is probably not needed for equivalence + // checks, but may require some permutations. We are not changing this for now. + if (!Objects.equals(node1.getDataSchema(), node2.getDataSchema())) { + return false; + } + if (!Objects.equals(node1.getNodeHint(), node2.getNodeHint())) { + return false; + } + List<PlanNode> inputs1 = node1.getInputs(); + List<PlanNode> inputs2 = node2.getInputs(); + if (inputs1.size() != inputs2.size()) { + return false; + } + for (int i = 0; i < inputs1.size(); i++) { + if (!inputs1.get(i).visit(this, inputs2.get(i))) { + return false; + } + } + return true; + } + + @Override + public Boolean visitMailboxSend(MailboxSendNode node1, PlanNode alreadyVisited) { + if (!(alreadyVisited instanceof MailboxSendNode)) { + return false; + } + MailboxSendNode visitedStage = (MailboxSendNode) alreadyVisited; + if (_equivalentStages.containsStage(node1)) { + // both nodes are already visited, so they can only be equivalent if they are in the same equivalence group + return _equivalentStages.getGroup(node1).contains(visitedStage); + } + //@formatter:off + return baseNode(node1, alreadyVisited) + // Commented out fields are used in equals() method of MailboxSendNode but not needed for equivalence. + // Receiver stage is not important for equivalence +// && node1.getReceiverStageId() == that.getReceiverStageId() + && node1.getExchangeType() == visitedStage.getExchangeType() + // Distribution type is not needed for equivalence. We deal with difference distribution types in the + // spooling logic. +// && Objects.equals(node1.getDistributionType(), that.getDistributionType()) + // TODO: Keys could probably be removed from the equivalence check, but would require to verify both + // keys are present in the data schema. We are not doing that for now. + && Objects.equals(node1.getKeys(), visitedStage.getKeys()) + // TODO: Pre-partitioned and collations can probably be removed from the equivalence check, but would + // require some extra checks or transformation on the spooling logic. We are not doing that for now. + && node1.isPrePartitioned() == visitedStage.isPrePartitioned() + && Objects.equals(node1.getCollations(), visitedStage.getCollations()); + //@formatter:on + } + + @Override + public Boolean visitAggregate(AggregateNode node1, PlanNode node2) { + if (!(node2 instanceof AggregateNode)) { + return false; + } + AggregateNode that = (AggregateNode) node2; + //@formatter:off + return baseNode(node1, node2) && Objects.equals(node1.getAggCalls(), that.getAggCalls()) + && Objects.equals(node1.getFilterArgs(), that.getFilterArgs()) + && Objects.equals(node1.getGroupKeys(), that.getGroupKeys()) + && node1.getAggType() == that.getAggType(); + //@formatter:on + } + + @Override + public Boolean visitMailboxReceive(MailboxReceiveNode node1, PlanNode node2) { + if (!(node2 instanceof MailboxReceiveNode)) { + return false; + } + MailboxReceiveNode that = (MailboxReceiveNode) node2; + MailboxSendNode node1Sender = node1.getSender(); + String nullSenderMessage = "This method should only be called at planning time, when the sender for a receiver " + + "node shall be not null."; + Preconditions.checkNotNull(node1Sender, nullSenderMessage); + MailboxSendNode node2Sender = that.getSender(); + Preconditions.checkNotNull(node2Sender, nullSenderMessage); + + // Remember that receive nodes do not have inputs. Their senders are a different attribute. + if (!areEquivalent(node1Sender, node2Sender)) { + return false; + } + + //@formatter:off + return baseNode(node1, node2) + // Commented out fields are used in equals() method of MailboxReceiveNode but not needed for equivalence. + // sender stage id will be different for sure, but we want (and already did) to compare sender equivalence + // instead +// && node1.getSenderStageId() == that.getSenderStageId() + + // TODO: Keys should probably be removed from the equivalence check, but would require to verify both + // keys are present in the data schema. We are not doing that for now. + && Objects.equals(node1.getKeys(), that.getKeys()) + // Distribution type is not needed for equivalence. We deal with difference distribution types in the + // spooling logic. +// && node1.getDistributionType() == that.getDistributionType() + // TODO: Sort, sort on sender and collations can probably be removed from the equivalence check, but would + // require some extra checks or transformation on the spooling logic. We are not doing that for now. + && node1.isSort() == that.isSort() + && node1.isSortedOnSender() == that.isSortedOnSender() + && Objects.equals(node1.getCollations(), that.getCollations()) + && node1.getExchangeType() == that.getExchangeType(); + //@formatter:on + } + + @Override + public Boolean visitFilter(FilterNode node1, PlanNode node2) { + if (!(node2 instanceof FilterNode)) { + return false; + } + FilterNode that = (FilterNode) node2; + return baseNode(node1, node2) && Objects.equals(node1.getCondition(), that.getCondition()); + } + + @Override + public Boolean visitJoin(JoinNode node1, PlanNode node2) { + if (!(node2 instanceof JoinNode)) { + return false; + } + JoinNode that = (JoinNode) node2; + //@formatter:off + return baseNode(node1, node2) && Objects.equals(node1.getJoinType(), that.getJoinType()) + && Objects.equals(node1.getLeftKeys(), that.getLeftKeys()) + && Objects.equals(node1.getRightKeys(), that.getRightKeys()) + && Objects.equals(node1.getNonEquiConditions(), that.getNonEquiConditions()); Review Comment: The `JoinStrategy` doesn't need to be compared? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org