yashmayya commented on code in PR #14296: URL: https://github.com/apache/pinot/pull/14296#discussion_r1820632275
########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void setUp() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, + mailbox); + }; + } + + /** + * Creates a join node that will have the left and right nodes as children. + * + * The join type will be {@link JoinRelType#FULL}, the join strategy will be {@link JoinNode.JoinStrategy#HASH} and + * there will be no conditions. If custom joins are needed feel free to add more builder methods or create your own + * instance of {@link SimpleChildBuilder}. + */ + public SimpleChildBuilder<JoinNode> join( + SimpleChildBuilder<? extends PlanNode> leftBuilder, + SimpleChildBuilder<? extends PlanNode> rightBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode left = leftBuilder.build(stageId); + PlanNode right = rightBuilder.build(stageId); + return new JoinNode(stageId, mySchema, myHints, List.of(left, right), JoinRelType.FULL, + Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), JoinNode.JoinStrategy.HASH); + }; + } + + /** + * Creates a pair of receiver and sender nodes that will be logically connected. + * + * Whenever this builder is used to create a node, the mailbox send node will be added to the list of mailboxes. + * + * Although there are builder methods to create send and receive mailboxes separately, this method is recommended + * because it deals with the stageId management and creates tests that are easier to read. + */ + public SimpleChildBuilder<MailboxReceiveNode> exchange( + int nextStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode input = childBuilder.build(stageId); + MailboxSendNode mailboxSendNode = new MailboxSendNode(nextStageId, null, List.of(input), stageId, null, null, + null, false, null, false); + MailboxSendNode old = _stageRoots.put(nextStageId, mailboxSendNode); + Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", nextStageId); + return new MailboxReceiveNode(stageId, null, List.of(), nextStageId, null, null, null, null, + false, false, mailboxSendNode); + }; + } + + /** + * Creates a table scan node with the given table name. + */ + public SimpleChildBuilder<TableScanNode> tableScan(String tableName) { + return (stageId, mySchema, myHints) -> new TableScanNode(stageId, mySchema, myHints, List.of(), tableName, + Collections.emptyList()); + } + + /** + * Looks for the mailbox that corresponds to the given stageId. + * @throws IllegalStateException if the mailbox is not found. + */ + public MailboxSendNode stage(int stageId) { + MailboxSendNode result = _stageRoots.get(stageId); + Preconditions.checkState(result != null, "Mailbox not found for stageId: %s", stageId); + return result; + } + + /** + * Returns a builder that can be used to create a new mailbox send node. + * + * Whenever this builder is used to create a node, the created node will be added to the list of mailboxes. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxSendNode> sendMailbox( + int newStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode input = childBuilder.build(stageId); + MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId, mySchema, List.of(input), stageId, null, null, + null, false, null, false); + MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode); + Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", stageId); + return mailboxSendNode; + }; + } + + /** + * A builder that can be used to create a child node. + * + * It is not expected for test writers to implement this class. Instead it is recommended to use methods like + * {@link #exchange(int, SimpleChildBuilder)}, {@link #join(SimpleChildBuilder, SimpleChildBuilder)}, + * {@link #tableScan(String)} and others to chain instances of this class. + */ + public interface ChildBuilder<P extends PlanNode> { Review Comment: nit: can be annotated as `FunctionalInterface` - I see that all the concrete implementations are lambda functions anyway. ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod Review Comment: nit: should this be `BeforeMethod` (method is called `setUp` but I suppose it doesn't particularly matter)? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); + + /** + * Adds a new group of equivalent stages. + * + * @param node The stage that will be the only member of the group. + * @return this object + * @throws IllegalArgumentException if the stage was already added. + */ + public Mutable addNewGroup(MailboxSendNode node) { + Preconditions.checkArgument(!containsStage(node), "Stage {} was already added", node.getStageId()); + SortedSet<MailboxSendNode> group = new TreeSet<>(STAGE_COMPARATOR); + group.add(node); + _groups.add(group); + _stageToGroup.put(node, group); + return this; + } + + /** + * Adds a stage to an existing group. + * @param original A stage that is already in the group. + * @param newNode The stage to be added to the group. + * @return this object + */ + public Mutable addToGroup(MailboxSendNode original, MailboxSendNode newNode) { + Preconditions.checkArgument(!containsStage(newNode), "Stage {} was already added", newNode.getStageId()); + SortedSet<MailboxSendNode> group = getGroup(original); + group.add(newNode); + _stageToGroup.put(newNode, group); + return this; + } + + @Override + public SortedSet<MailboxSendNode> getLeaders() { + return _groups.stream() + .map(SortedSet::first) + .collect(Collectors.toCollection(() -> new TreeSet<>(STAGE_COMPARATOR))); + } + + @Override + public SortedSet<SortedSet<MailboxSendNode>> getGroups() { + return _groups; + } + + @Override + public boolean containsStage(MailboxSendNode stage) { + return _stageToGroup.containsKey(stage); + } + + @Override + public SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException { + SortedSet<MailboxSendNode> group = _stageToGroup.get(stage); + if (group == null) { + throw new NoSuchElementException("Stage " + stage.getStageId() + " is unknown by this class"); + } + return group; + } + + public Mutable removeStage(MailboxSendNode stage) { Review Comment: If S2 and S5 are equivalent, why will both of them be retained in the final plan? ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void setUp() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, + mailbox); + }; + } + + /** + * Creates a join node that will have the left and right nodes as children. + * + * The join type will be {@link JoinRelType#FULL}, the join strategy will be {@link JoinNode.JoinStrategy#HASH} and + * there will be no conditions. If custom joins are needed feel free to add more builder methods or create your own + * instance of {@link SimpleChildBuilder}. + */ + public SimpleChildBuilder<JoinNode> join( + SimpleChildBuilder<? extends PlanNode> leftBuilder, + SimpleChildBuilder<? extends PlanNode> rightBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode left = leftBuilder.build(stageId); + PlanNode right = rightBuilder.build(stageId); Review Comment: Ah never mind, these would be mailbox receive nodes with the same stage ID as the join node - the inputs (senders) to the receive nodes would be in different stages. ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/GroupedStagesTest.java: ########## @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class GroupedStagesTest extends StagesTestBase { + + @Test + public void addOrdered() { + when( + join( + exchange(1, tableScan("T1")), + exchange(2, tableScan("T2")) + ) + ); + + GroupedStages.Mutable mutable = new GroupedStages.Mutable() + .addNewGroup(stage(0)) + .addNewGroup(stage(1)) + .addNewGroup(stage(2)); + assertEquals(mutable.toString(), "[[0], [1], [2]]"); + } + + @Test + public void addUnordered() { + when( + join( + exchange(1, tableScan("T1")), + exchange(2, tableScan("T2")) + ) + ); + GroupedStages.Mutable mutable = new GroupedStages.Mutable() + .addNewGroup(stage(2)) + .addNewGroup(stage(1)) + .addNewGroup(stage(0)); + assertEquals(mutable.toString(), "[[0], [1], [2]]"); + } + + @Test + public void addEquivalence() { + when( + join( + exchange(1, tableScan("T1")), + exchange(2, tableScan("T2")) + ) + ); + GroupedStages.Mutable mutable = new GroupedStages.Mutable() + .addNewGroup(stage(0)) + .addNewGroup(stage(1)) + .addToGroup(stage(0), stage(2)); + assertEquals(mutable.toString(), "[[0, 2], [1]]"); Review Comment: The tests in these classes probably don't need to use the `StagesTestBase` framework right? For instance, here it's pretty confusing because stage 0 and stage 2 are not actually equivalent? ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void setUp() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, + mailbox); + }; + } + + /** + * Creates a join node that will have the left and right nodes as children. + * + * The join type will be {@link JoinRelType#FULL}, the join strategy will be {@link JoinNode.JoinStrategy#HASH} and + * there will be no conditions. If custom joins are needed feel free to add more builder methods or create your own + * instance of {@link SimpleChildBuilder}. + */ + public SimpleChildBuilder<JoinNode> join( + SimpleChildBuilder<? extends PlanNode> leftBuilder, + SimpleChildBuilder<? extends PlanNode> rightBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode left = leftBuilder.build(stageId); + PlanNode right = rightBuilder.build(stageId); Review Comment: In an actual query plan these would be different stage IDs right? ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void setUp() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, + mailbox); + }; + } + + /** + * Creates a join node that will have the left and right nodes as children. + * + * The join type will be {@link JoinRelType#FULL}, the join strategy will be {@link JoinNode.JoinStrategy#HASH} and + * there will be no conditions. If custom joins are needed feel free to add more builder methods or create your own + * instance of {@link SimpleChildBuilder}. + */ + public SimpleChildBuilder<JoinNode> join( + SimpleChildBuilder<? extends PlanNode> leftBuilder, + SimpleChildBuilder<? extends PlanNode> rightBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode left = leftBuilder.build(stageId); + PlanNode right = rightBuilder.build(stageId); + return new JoinNode(stageId, mySchema, myHints, List.of(left, right), JoinRelType.FULL, + Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), JoinNode.JoinStrategy.HASH); + }; + } + + /** + * Creates a pair of receiver and sender nodes that will be logically connected. + * + * Whenever this builder is used to create a node, the mailbox send node will be added to the list of mailboxes. + * + * Although there are builder methods to create send and receive mailboxes separately, this method is recommended + * because it deals with the stageId management and creates tests that are easier to read. + */ + public SimpleChildBuilder<MailboxReceiveNode> exchange( + int nextStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode input = childBuilder.build(stageId); + MailboxSendNode mailboxSendNode = new MailboxSendNode(nextStageId, null, List.of(input), stageId, null, null, + null, false, null, false); + MailboxSendNode old = _stageRoots.put(nextStageId, mailboxSendNode); + Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", nextStageId); + return new MailboxReceiveNode(stageId, null, List.of(), nextStageId, null, null, null, null, + false, false, mailboxSendNode); + }; + } + + /** + * Creates a table scan node with the given table name. + */ + public SimpleChildBuilder<TableScanNode> tableScan(String tableName) { + return (stageId, mySchema, myHints) -> new TableScanNode(stageId, mySchema, myHints, List.of(), tableName, + Collections.emptyList()); + } + + /** + * Looks for the mailbox that corresponds to the given stageId. + * @throws IllegalStateException if the mailbox is not found. + */ + public MailboxSendNode stage(int stageId) { + MailboxSendNode result = _stageRoots.get(stageId); + Preconditions.checkState(result != null, "Mailbox not found for stageId: %s", stageId); + return result; + } + + /** + * Returns a builder that can be used to create a new mailbox send node. + * + * Whenever this builder is used to create a node, the created node will be added to the list of mailboxes. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxSendNode> sendMailbox( + int newStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode input = childBuilder.build(stageId); + MailboxSendNode mailboxSendNode = new MailboxSendNode(newStageId, mySchema, List.of(input), stageId, null, null, + null, false, null, false); + MailboxSendNode old = _stageRoots.put(stageId, mailboxSendNode); + Preconditions.checkState(old == null, "Mailbox already exists for stageId: %s", stageId); + return mailboxSendNode; + }; + } + + /** + * A builder that can be used to create a child node. + * + * It is not expected for test writers to implement this class. Instead it is recommended to use methods like + * {@link #exchange(int, SimpleChildBuilder)}, {@link #join(SimpleChildBuilder, SimpleChildBuilder)}, + * {@link #tableScan(String)} and others to chain instances of this class. + */ + public interface ChildBuilder<P extends PlanNode> { + P build(int stageId, @Nullable DataSchema dataSchema, @Nullable PlanNode.NodeHint hints); Review Comment: This isn't currently being used anywhere directly? All external uses seem to be through `SimpleChildBuilder`? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/GroupedStages.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.NoSuchElementException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.pinot.query.planner.plannode.BasePlanNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; + + +/** + * This represents a mathematical partition of the stages in a query plan, grouping the stages in sets of disjoint + * stages. + * + * It is important to understand that this class assumes all stages that are stored belong to the same query plan + * and therefore their stage ids are unique. It also assumes that the same stage instances are being used when + * methods like {@link #containsStage(MailboxSendNode)} are called. + * + * The original reason to have this class was to group equivalent stages together, although it can be used for other + * purposes. + * + * Although the only implementation provided so far ({@link Mutable}) is mutable, the class is designed + * to be immutable from the outside. This is because it is difficult to manipulate grouped stages directly without + * breaking the invariants of the class, so it is better to be sure it is not modified after it is calculated. + */ +public abstract class GroupedStages { + + public static final Comparator<MailboxSendNode> STAGE_COMPARATOR = Comparator.comparing(BasePlanNode::getStageId); + public static final Comparator<SortedSet<MailboxSendNode>> GROUP_COMPARATOR + = Comparator.comparing(group -> group.first().getStageId()); + + public abstract boolean containsStage(MailboxSendNode stage); + + /** + * Returns the group of equivalent stages that contains the given stage. + * + * The set is sorted by the stage id. + */ + public abstract SortedSet<MailboxSendNode> getGroup(MailboxSendNode stage) + throws NoSuchElementException; + + /** + * Returns the leaders of each group. + * + * The leader of a group is the stage with the smallest stage id in the group. + */ + public abstract SortedSet<MailboxSendNode> getLeaders(); + + /** + * Returns the groups. + * + * Each set contains the stages that are grouped. These sets are disjoint. The union of these sets is the set of all + * stages known by this object. + * + * The result is sorted by the leader of each group and each group is sorted by the stage id. + */ + public abstract SortedSet<SortedSet<MailboxSendNode>> getGroups(); + + @Override + public String toString() { + String content = getGroups().stream() + .map(group -> + "[" + group.stream() + .map(stage -> Integer.toString(stage.getStageId())) + .collect(Collectors.joining(", ")) + "]" + ) + .collect(Collectors.joining(", ")); + + return "[" + content + "]"; + } + + /** + * A mutable version of {@link GroupedStages}. + */ + public static class Mutable extends GroupedStages { + /** + * All groups of stages. + * + * Although these groups are never empty, a group may contain only one stage if it is not grouped with any other + * stage. + */ + private final SortedSet<SortedSet<MailboxSendNode>> _groups = new TreeSet<>(GROUP_COMPARATOR); + + /** + * Map from stage to the group of stages it belongs to. + */ + private final IdentityHashMap<MailboxSendNode, SortedSet<MailboxSendNode>> _stageToGroup = new IdentityHashMap<>(); Review Comment: Makes sense, but shouldn't these structures be very tiny since they're representing stages in a single query - in the order of 10s or maybe 100s of elements at most in very extreme cases? ########## pinot-query-planner/src/test/java/org/apache/pinot/query/planner/logical/StagesTestBase.java: ########## @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.logical; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.testng.annotations.AfterMethod; + + +/** + * A base test class that can be used to write tests for stages using a fluent DSL. + * + * This class provides two features: + * <ul> + * <li>Builders that can be used to create nodes in a fluent way.</li> + * <li>Access to the stages that were created during the test with {@link #stage(int)}.</li> + * </ul> + * + * It is expected that each test method will call {@link #when(SimpleChildBuilder)} to create a new plan, which will + * populate the list of stages. After that, the test can look for the stages with the {@link #stage(int)} method and + * assert the expected behavior. + */ +public class StagesTestBase { + private final HashMap<Integer, MailboxSendNode> _stageRoots = new HashMap<>(); + + /** + * Clears the list of stages. + * + * This method is automatically called by the test framework, ensuring each test starts with a clean slate. + * This method can also be called in middle of the test, but that is not recommended given it usually means that the + * test is getting too complex and difficult to read and/or get insights from it in case of failure. + */ + @AfterMethod + public void setUp() { + _stageRoots.clear(); + } + + /** + * Creates a new plan that will have an initial stage. + * + * The stage will have a default {@link MailboxSendNode} whose stage will be 0 and its child the one created by the + * builder. + * + * Notice that this method does not offer any way to customize the initial send mailbox. + */ + public MailboxSendNode when(SimpleChildBuilder<? extends PlanNode> builder) { + return sendMailbox(0, builder).build(0); + } + + /** + * Returns a builder that can be used to create a new mailbox receive node. + * + * It is usually recommended to use {@link #exchange(int, SimpleChildBuilder)} instead of this method, given that + * {@code exchange} creates a pair of send and receive mailboxes and deals with the stageId management. + */ + public SimpleChildBuilder<MailboxReceiveNode> receiveMailbox(SimpleChildBuilder<MailboxSendNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + MailboxSendNode mailbox = childBuilder.build(stageId); + int nextStageId = mailbox.getStageId(); + return new MailboxReceiveNode(stageId, mySchema, List.of(), nextStageId, null, null, null, null, false, false, + mailbox); + }; + } + + /** + * Creates a join node that will have the left and right nodes as children. + * + * The join type will be {@link JoinRelType#FULL}, the join strategy will be {@link JoinNode.JoinStrategy#HASH} and + * there will be no conditions. If custom joins are needed feel free to add more builder methods or create your own + * instance of {@link SimpleChildBuilder}. + */ + public SimpleChildBuilder<JoinNode> join( + SimpleChildBuilder<? extends PlanNode> leftBuilder, + SimpleChildBuilder<? extends PlanNode> rightBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode left = leftBuilder.build(stageId); + PlanNode right = rightBuilder.build(stageId); + return new JoinNode(stageId, mySchema, myHints, List.of(left, right), JoinRelType.FULL, + Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), JoinNode.JoinStrategy.HASH); + }; + } + + /** + * Creates a pair of receiver and sender nodes that will be logically connected. + * + * Whenever this builder is used to create a node, the mailbox send node will be added to the list of mailboxes. + * + * Although there are builder methods to create send and receive mailboxes separately, this method is recommended + * because it deals with the stageId management and creates tests that are easier to read. + */ + public SimpleChildBuilder<MailboxReceiveNode> exchange( + int nextStageId, SimpleChildBuilder<? extends PlanNode> childBuilder) { + return (stageId, mySchema, myHints) -> { + PlanNode input = childBuilder.build(stageId); Review Comment: I think this should be `childBuilder.build(nextStageId)` right? The child is the input to the `MailboxSendNode` and must be in the same stage as it. For instance, in `EquivalentStagesFinderTest`, a test like ``` when( join( exchange(1, tableScan("T1")), exchange(2, tableScan("T2")) ) ); ``` results in the join node as well as both the table scan nodes being in stage 0, but the table scan nodes should be in stages 1 and 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org