siddharthteotia commented on a change in pull request #8340: URL: https://github.com/apache/pinot/pull/8340#discussion_r829187138
########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java ########## @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.io.Serializable; +import java.util.List; + + +/** + * Stage Node is a serializable version of the {@link org.apache.calcite.rel.RelNode}. + * + * It represents the relationship of the current node and also the downstream inputs. + */ +public interface StageNode extends Serializable { Review comment: I am a little confused by `StageNode`. Is this supposed to be the interface for all nodes (leaf / non-leaf etc) within a stage ? If so, then I think this makes sense. My confusion is if this represents purely the stage boundary aka `ExchangeNode` ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java ########## @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query; + +import java.util.Collection; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.prepare.PlannerImpl; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.sql2rel.StandardConvertletTable; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.parser.CalciteSqlParser; +import org.apache.pinot.query.planner.LogicalPlanner; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.StagePlanner; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.rules.PinotQueryRuleSets; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.validate.Validator; + + +/** + * The {@code QueryEnvironment} contains the main entrypoint for query planning. + * + * <p>It provide the higher level entry interface to convert a SQL string into a {@link QueryPlan}. + */ +public class QueryEnvironment { + private final RelDataTypeFactory _typeFactory; + private final CalciteSchema _rootSchema; + private final WorkerManager _workerManager; + private final FrameworkConfig _config; + + private final PlannerImpl _planner; + private final SqlValidator _validator; + private final Prepare.CatalogReader _catalogReader; + private final Collection<RelOptRule> _logicalRuleSet; + private final RelOptPlanner _relOptPlanner; + + public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, WorkerManager workerManager) { + _typeFactory = typeFactory; + _rootSchema = rootSchema; + _workerManager = workerManager; + _config = Frameworks.newConfigBuilder().traitDefs().build(); + + // this is only here as a placeholder for SqlToRelConverter expandView implementation. + _planner = new PlannerImpl(_config); + + // catalog + _catalogReader = new CalciteCatalogReader(_rootSchema, _rootSchema.path(null), _typeFactory, null); + _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, _typeFactory); + + // optimizer rules + _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES; + + // optimizer + HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); + for (RelOptRule relOptRule : _logicalRuleSet) { + hepProgramBuilder.addRuleInstance(relOptRule); + } + _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(), Contexts.EMPTY_CONTEXT); + } + + public QueryPlan sqlQuery(String sqlQuery) { + PlannerContext plannerContext = new PlannerContext(); + try { + SqlNode parsed = this.parse(sqlQuery, plannerContext); + SqlNode validated = this.validate(parsed); + RelRoot relation = this.toRelation(validated, plannerContext); + RelNode optimized = this.optimize(relation, plannerContext); + return this.toQuery(optimized, plannerContext); + } catch (Exception e) { + throw new RuntimeException("Error composing query plan", e); Review comment: Include SQL query in the exception message ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java ########## @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import org.apache.calcite.rel.RelDistribution; + + +public class MailboxSendNode extends AbstractStageNode { Review comment: Based on the [discussion thread in design doc](https://docs.google.com/document/d/1iqzk54prgwIK59V3ELGoANs6h4v6ypHJkqMfiUhoI_I/edit?disco=AAAAWdz_Rfo), I think we should have an abstraction of ExchangeNode. ExchangeNode should encapsulate sender and receiver node. Similarly, there should be an abstraction for sender and receiver themselves. Something like following.... Exchange - BroadcastExchange - SingleMergeExchange - HashPartitionExchange Sender - BroadcastSender - SingleSender - HashPartitionSender Receiver - OrderedReceiver - UnorderedReceiver BroadcastExchange encapsulates - BroadcastSender - SomeReceiver HashPartitionExchange encapsulates - HashPartitionSender - SomeReceiver So ideally MailboxSend and MailboxReceive should be modeled as sender and receiver abstractions respectively as opposed to concrete implementations imo ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java ########## @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataTypeField; + + +public class TableScanNode extends AbstractStageNode { + private final List<String> _tableName; Review comment: Why is this a list ? Shouldn't there be a single table per `TableScanNode` in the plan ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java ########## @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataTypeField; + + +public class TableScanNode extends AbstractStageNode { + private final List<String> _tableName; + private final List<String> _tableScanColumns; + + public TableScanNode(LogicalTableScan tableScan, String stageId) { + super(stageId); + _tableName = tableScan.getTable().getQualifiedName(); + // TODO: optimize this, table field is not directly usable as name. + _tableScanColumns = + tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList()); + } + + @Override + public List<StageNode> getInputs() { + return Collections.emptyList(); + } + + @Override + public void addInput(StageNode queryStageRoot) { + throw new UnsupportedOperationException("table scan cannot add input"); Review comment: (nit) suggest changing the message to `TableScanNode cannot add input as it is a leaf node` ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.ArrayList; +import java.util.List; + + +public abstract class AbstractStageNode implements StageNode { + + protected final String _stageId; Review comment: int (to reduce heap usage) ? or do we think this is arbitrary bytes and String is better ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.ArrayList; +import java.util.List; + + +public abstract class AbstractStageNode implements StageNode { + + protected final String _stageId; Review comment: int (to reduce heap usage) ? or do we think this is arbitrary bytes and String is better ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.ArrayList; +import java.util.List; + + +public abstract class AbstractStageNode implements StageNode { + + protected final String _stageId; + protected final List<StageNode> _inputs; + + public AbstractStageNode(String stageId) { Review comment: I think the base class constructor should also have a `RelNode` as the argument ? We do `queryStagePlanner.makePlan(relRoot)` -> relRoot is the `RelNode` output from `logicalPlanner.findBestExp()`. So when the `StageNode` is being created, it has to be created from a corresponding calcite logical `RelNode` ? I think this is true even for ExchangeNode because they will be created from `LogicalExchange` ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import java.util.ArrayList; +import java.util.List; + + +public abstract class AbstractStageNode implements StageNode { + + protected final String _stageId; + protected final List<StageNode> _inputs; + + public AbstractStageNode(String stageId) { Review comment: We do `queryStagePlanner.makePlan(relRoot)` -> relRoot is the `RelNode` output from `logicalPlanner.findBestExp()`. So when the `StageNode` is being created, it has to be created from a corresponding calcite logical `RelNode` ? In that case, I think the base class constructor should also have a `RelNode` as the argument to enforce this behavior ? I think this is true even for ExchangeNode because they will be created from `LogicalExchange` ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageNodeConverter.java ########## @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.pinot.query.planner.nodes.CalcNode; +import org.apache.pinot.query.planner.nodes.JoinNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.planner.nodes.TableScanNode; + + +/** + * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link StageNode}. + */ +public final class StageNodeConverter { Review comment: (nit) suggest renaming to `CalciteLogicalToPinotStageNodeConverter` or something similar ? ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java ########## @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.nodes.MailboxSendNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.routing.WorkerManager; + + +/** + * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}. + * + * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. + */ +public class StagePlanner { + private final PlannerContext _plannerContext; + private final WorkerManager _workerManager; + + private Map<String, StageNode> _queryStageMap; + private Map<String, StageMetadata> _stageMetadataMap; + private int _stageIdCounter; + + public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) { + _plannerContext = plannerContext; + _workerManager = workerManager; + } + + public QueryPlan makePlan(RelNode relRoot) { + // clear the state + _queryStageMap = new HashMap<>(); + _stageMetadataMap = new HashMap<>(); + _stageIdCounter = 0; + + // walk the plan and create stages. + StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId()); + + // global root needs to send results back to the ROOT, a.k.a. the client response node. + // the last stage is always a broadcast-gather. + StageNode globalReceiverNode = + new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED); + StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, globalReceiverNode.getStageId(), + RelDistribution.Type.BROADCAST_DISTRIBUTED); + _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode); + StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId()); + stageMetadata.attach(globalSenderNode); + + _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode); + StageMetadata globalReceivingStageMetadata = new StageMetadata(); + globalReceivingStageMetadata.attach(globalReceiverNode); + _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata); + + // assign workers to each stage. + for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) { + _workerManager.assignWorkerToStage(e.getKey(), e.getValue()); + } + + return new QueryPlan(_queryStageMap, _stageMetadataMap); + } + + // non-threadsafe + private StageNode walkRelPlan(RelNode node, String currentStageId) { + if (isExchangeNode(node)) { + // 1. exchangeNode always have only one input, get its input converted as a new stage root. + StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId()); + RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType(); + + // 2. make an exchange sender and receiver node pair + StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType); + StageNode mailboxSender = new MailboxSendNode(nextStageRoot, mailboxReceiver.getStageId(), exchangeType); + + // 3. put the sender side as a completed stage. + _queryStageMap.put(mailboxSender.getStageId(), mailboxSender); + + // 4. return the receiver (this is considered as a "virtual table scan" node for its parent. + return mailboxReceiver; + } else { + StageNode stageNode = StageNodeConverter.toStageNode(node, currentStageId); + List<RelNode> inputs = node.getInputs(); + for (RelNode input : inputs) { + stageNode.addInput(walkRelPlan(input, currentStageId)); + } + StageMetadata stageMetadata = _stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata()); + stageMetadata.attach(stageNode); + return stageNode; + } + } + + private boolean isExchangeNode(RelNode node) { + return (node instanceof LogicalExchange); + } + + private String getNewStageId() { + return String.valueOf(_stageIdCounter++); Review comment: I suggest using INT as stageId to play nice with heap usage of such StageNode objects esp because the code is converting counter to string ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java ########## @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.nodes.MailboxSendNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.routing.WorkerManager; + + +/** + * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}. + * + * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. + */ +public class StagePlanner { + private final PlannerContext _plannerContext; + private final WorkerManager _workerManager; + + private Map<String, StageNode> _queryStageMap; + private Map<String, StageMetadata> _stageMetadataMap; + private int _stageIdCounter; + + public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) { + _plannerContext = plannerContext; + _workerManager = workerManager; + } + + public QueryPlan makePlan(RelNode relRoot) { + // clear the state + _queryStageMap = new HashMap<>(); + _stageMetadataMap = new HashMap<>(); + _stageIdCounter = 0; + + // walk the plan and create stages. + StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId()); + + // global root needs to send results back to the ROOT, a.k.a. the client response node. + // the last stage is always a broadcast-gather. + StageNode globalReceiverNode = + new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED); + StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, globalReceiverNode.getStageId(), + RelDistribution.Type.BROADCAST_DISTRIBUTED); + _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode); + StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId()); + stageMetadata.attach(globalSenderNode); + + _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode); + StageMetadata globalReceivingStageMetadata = new StageMetadata(); + globalReceivingStageMetadata.attach(globalReceiverNode); + _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata); + + // assign workers to each stage. + for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) { + _workerManager.assignWorkerToStage(e.getKey(), e.getValue()); + } + + return new QueryPlan(_queryStageMap, _stageMetadataMap); + } + + // non-threadsafe + private StageNode walkRelPlan(RelNode node, String currentStageId) { + if (isExchangeNode(node)) { + // 1. exchangeNode always have only one input, get its input converted as a new stage root. + StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId()); + RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType(); + + // 2. make an exchange sender and receiver node pair + StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType); + StageNode mailboxSender = new MailboxSendNode(nextStageRoot, mailboxReceiver.getStageId(), exchangeType); + + // 3. put the sender side as a completed stage. + _queryStageMap.put(mailboxSender.getStageId(), mailboxSender); + + // 4. return the receiver (this is considered as a "virtual table scan" node for its parent. + return mailboxReceiver; + } else { + StageNode stageNode = StageNodeConverter.toStageNode(node, currentStageId); + List<RelNode> inputs = node.getInputs(); + for (RelNode input : inputs) { + stageNode.addInput(walkRelPlan(input, currentStageId)); + } + StageMetadata stageMetadata = _stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata()); + stageMetadata.attach(stageNode); + return stageNode; + } + } + + private boolean isExchangeNode(RelNode node) { + return (node instanceof LogicalExchange); + } + + private String getNewStageId() { + return String.valueOf(_stageIdCounter++); Review comment: I suggest using INT as stageId to play nice with heap usage of such StageNode objects esp because the code is converting counter to string so might as well use INT ########## File path: pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java ########## @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.pinot.query.context.PlannerContext; +import org.apache.pinot.query.planner.nodes.MailboxReceiveNode; +import org.apache.pinot.query.planner.nodes.MailboxSendNode; +import org.apache.pinot.query.planner.nodes.StageNode; +import org.apache.pinot.query.routing.WorkerManager; + + +/** + * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}. + * + * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. + */ +public class StagePlanner { + private final PlannerContext _plannerContext; + private final WorkerManager _workerManager; + + private Map<String, StageNode> _queryStageMap; + private Map<String, StageMetadata> _stageMetadataMap; + private int _stageIdCounter; + + public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) { + _plannerContext = plannerContext; + _workerManager = workerManager; + } + + public QueryPlan makePlan(RelNode relRoot) { + // clear the state + _queryStageMap = new HashMap<>(); + _stageMetadataMap = new HashMap<>(); + _stageIdCounter = 0; + + // walk the plan and create stages. + StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId()); + + // global root needs to send results back to the ROOT, a.k.a. the client response node. + // the last stage is always a broadcast-gather. + StageNode globalReceiverNode = + new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), RelDistribution.Type.BROADCAST_DISTRIBUTED); + StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, globalReceiverNode.getStageId(), + RelDistribution.Type.BROADCAST_DISTRIBUTED); + _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode); + StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId()); + stageMetadata.attach(globalSenderNode); + + _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode); + StageMetadata globalReceivingStageMetadata = new StageMetadata(); + globalReceivingStageMetadata.attach(globalReceiverNode); + _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata); + + // assign workers to each stage. + for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) { + _workerManager.assignWorkerToStage(e.getKey(), e.getValue()); + } + + return new QueryPlan(_queryStageMap, _stageMetadataMap); + } + + // non-threadsafe Review comment: I had a thread safety related question on QueryEnvironment. If that class is instantiated per compiled query, then it implies calls to StagePlanner should be thread safe ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org