yashmayya commented on code in PR #14296: URL: https://github.com/apache/pinot/pull/14296#discussion_r1821907223
########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readerwriter/ValueReaderComparisonTest.java: ########## @@ -52,6 +53,9 @@ public class ValueReaderComparisonTest { @DataProvider public static Object[] text() { + List<Integer> i1 = Collections.nCopies(NUM_ROUNDS, 1); + List<Integer> i2 = Collections.nCopies(NUM_ROUNDS, 2); + Lists.merge(i1, i2); Review Comment: Stray change? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); + + /** + * Adds a new group of equivalent stages. + * + * @param node The stage that will be the only member of the group. + * @return this object + * @throws IllegalArgumentException if the stage was already added. + */ + public Mutable addNewGroup(MailboxSendNode node) { + Preconditions.checkArgument(!containsStage(node), "Stage {} was already added", node.getStageId()); + SortedSet<MailboxSendNode> group = new TreeSet<>(STAGE_COMPARATOR); + group.add(node); + _groups.add(group); + _stageToGroup.put(node, group); + return this; + } + + /** + * Adds a stage to an existing group. + * @param original A stage that is already in the group. + * @param newNode The stage to be added to the group. + * @return this object + */ + public Mutable addToGroup(MailboxSendNode original, MailboxSendNode newNode) { + Preconditions.checkArgument(!containsStage(newNode), "Stage {} was already added", newNode.getStageId()); + SortedSet<MailboxSendNode> group = getGroup(original); + group.add(newNode); + _stageToGroup.put(newNode, group); + return this; + } + + @Override + public SortedSet<MailboxSendNode> getLeaders() { + return _groups.stream() + .map(SortedSet::first) + .collect(Collectors.toCollection(() -> new TreeSet<>(STAGE_COMPARATOR))); + } + + @Override + public SortedSet<SortedSet<MailboxSendNode>> getGroups() { + return _groups; + } + + @Override + public boolean containsStage(MailboxSendNode stage) { + return _stageToGroup.containsKey(stage); + } + + @Override + public SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException { + SortedSet<MailboxSendNode> group = _stageToGroup.get(stage); + if (group == null) { + throw new NoSuchElementException("Stage " + stage.getStageId() + " is unknown by this class"); + } + return group; + } + + public Mutable removeStage(MailboxSendNode stage) { Review Comment: Wouldn't they still be replaced by new stages? The new ones will retain the sender nodes from the original but the rest of the stage will be replaced by a single receive node which gets data from the synthetic stage that contains all the other nodes (apart from the sender) from the equivalent stages right? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java: ########## @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Objects; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.ExplainedNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This utility class can be used to find equivalent stages in the query plan. + * + * Equivalent stages are stages that represent the same job to be done. These stages can be potentially optimized to + * execute that job only once in a special stage that broadcast the results to all the equivalent stages. + */ +public class EquivalentStagesFinder { + public static final Logger LOGGER = LoggerFactory.getLogger(EquivalentStagesFinder.class); + + private EquivalentStagesFinder() { + } + + public static GroupedStages findEquivalentStages(MailboxSendNode root) { Review Comment: Yep, I do think this is easier to understand since it codifies the assumption, thanks! ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void cleanup() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, Review Comment: ```suggestion return new MailboxReceiveNode(stageId, mySchema, nextStageId, null, null, null, null, false, false, ``` This compilation error is causing the build to fail currently. ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); Review Comment: Yeah that's fair, although the `SortedSet<MailboxSendNode>[]` definitely seems like it would have crossed the acceptable boundary for complexity - performance tradeoff here 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org