siddharthteotia commented on a change in pull request #8340:
URL: https://github.com/apache/pinot/pull/8340#discussion_r829187138



##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Stage Node is a serializable version of the {@link 
org.apache.calcite.rel.RelNode}.
+ *
+ * It represents the relationship of the current node and also the downstream 
inputs.
+ */
+public interface StageNode extends Serializable {

Review comment:
       I am a little confused by `StageNode`.
   
   Is this supposed to be the interface for all nodes (leaf / non-leaf etc) 
within a stage ? If so, then I think this makes sense. My confusion is if this 
represents purely the stage boundary aka `ExchangeNode`  ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
##########
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import java.util.Collection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.PlannerImpl;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.parser.CalciteSqlParser;
+import org.apache.pinot.query.planner.LogicalPlanner;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StagePlanner;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.rules.PinotQueryRuleSets;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.validate.Validator;
+
+
+/**
+ * The {@code QueryEnvironment} contains the main entrypoint for query 
planning.
+ *
+ * <p>It provide the higher level entry interface to convert a SQL string into 
a {@link QueryPlan}.
+ */
+public class QueryEnvironment {
+  private final RelDataTypeFactory _typeFactory;
+  private final CalciteSchema _rootSchema;
+  private final WorkerManager _workerManager;
+  private final FrameworkConfig _config;
+
+  private final PlannerImpl _planner;
+  private final SqlValidator _validator;
+  private final Prepare.CatalogReader _catalogReader;
+  private final Collection<RelOptRule> _logicalRuleSet;
+  private final RelOptPlanner _relOptPlanner;
+
+  public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, 
WorkerManager workerManager) {
+    _typeFactory = typeFactory;
+    _rootSchema = rootSchema;
+    _workerManager = workerManager;
+    _config = Frameworks.newConfigBuilder().traitDefs().build();
+
+    // this is only here as a placeholder for SqlToRelConverter expandView 
implementation.
+    _planner = new PlannerImpl(_config);
+
+    // catalog
+    _catalogReader = new CalciteCatalogReader(_rootSchema, 
_rootSchema.path(null), _typeFactory, null);
+    _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, 
_typeFactory);
+
+    // optimizer rules
+    _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;
+
+    // optimizer
+    HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+    for (RelOptRule relOptRule : _logicalRuleSet) {
+      hepProgramBuilder.addRuleInstance(relOptRule);
+    }
+    _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(), 
Contexts.EMPTY_CONTEXT);
+  }
+
+  public QueryPlan sqlQuery(String sqlQuery) {
+    PlannerContext plannerContext = new PlannerContext();
+    try {
+      SqlNode parsed = this.parse(sqlQuery, plannerContext);
+      SqlNode validated = this.validate(parsed);
+      RelRoot relation = this.toRelation(validated, plannerContext);
+      RelNode optimized = this.optimize(relation, plannerContext);
+      return this.toQuery(optimized, plannerContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Error composing query plan", e);

Review comment:
       Include SQL query in the exception message ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+
+
+public class MailboxSendNode extends AbstractStageNode {

Review comment:
       Based on the [discussion thread in design 
doc](https://docs.google.com/document/d/1iqzk54prgwIK59V3ELGoANs6h4v6ypHJkqMfiUhoI_I/edit?disco=AAAAWdz_Rfo),
 I think we should have an abstraction of ExchangeNode. ExchangeNode should 
encapsulate sender and receiver node. 
   
   Similarly, there should be an abstraction for sender and receiver themselves.
   
   Something like following....
   
   Exchange
     -  BroadcastExchange
     -  SingleMergeExchange
     - HashPartitionExchange
   
   Sender
      - BroadcastSender
      - SingleSender
      - HashPartitionSender
   
   Receiver
      - OrderedReceiver
      - UnorderedReceiver
   
   BroadcastExchange encapsulates
      - BroadcastSender
      - SomeReceiver
   
   HashPartitionExchange encapsulates
      - HashPartitionSender
      - SomeReceiver
   
   
   So ideally MailboxSend and MailboxReceive should be modeled as sender and 
receiver abstractions respectively as opposed to concrete implementations imo
   
   
   

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+
+public class TableScanNode extends AbstractStageNode {
+  private final List<String> _tableName;

Review comment:
       Why is this a list ? Shouldn't there be a single table per 
`TableScanNode` in the plan ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+
+public class TableScanNode extends AbstractStageNode {
+  private final List<String> _tableName;
+  private final List<String> _tableScanColumns;
+
+  public TableScanNode(LogicalTableScan tableScan, String stageId) {
+    super(stageId);
+    _tableName = tableScan.getTable().getQualifiedName();
+    // TODO: optimize this, table field is not directly usable as name.
+    _tableScanColumns =
+        
tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<StageNode> getInputs() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void addInput(StageNode queryStageRoot) {
+    throw new UnsupportedOperationException("table scan cannot add input");

Review comment:
       (nit) suggest changing the message to `TableScanNode cannot add input as 
it is a leaf node` ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+  protected final String _stageId;

Review comment:
       int (to reduce heap usage) ? or do we think this is arbitrary bytes and 
String is better

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+  protected final String _stageId;

Review comment:
       int (to reduce heap usage) ? or do we think this is arbitrary bytes and 
String is better ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+  protected final String _stageId;
+  protected final List<StageNode> _inputs;
+
+  public AbstractStageNode(String stageId) {

Review comment:
       I think the base class constructor should also have a `RelNode` as the 
argument ?
   
   We do `queryStagePlanner.makePlan(relRoot)` -> relRoot is the `RelNode` 
output from `logicalPlanner.findBestExp()`.
   
   So when the `StageNode` is being created, it has to be created from a 
corresponding calcite logical `RelNode` ?
   
   I think this is true even for ExchangeNode because they will be created from 
`LogicalExchange`

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+  protected final String _stageId;
+  protected final List<StageNode> _inputs;
+
+  public AbstractStageNode(String stageId) {

Review comment:
       We do `queryStagePlanner.makePlan(relRoot)` -> relRoot is the `RelNode` 
output from `logicalPlanner.findBestExp()`.
   
   So when the `StageNode` is being created, it has to be created from a 
corresponding calcite logical `RelNode` ? In that case, I think the base class 
constructor should also have a `RelNode` as the argument to enforce this 
behavior ?
   
   I think this is true even for ExchangeNode because they will be created from 
`LogicalExchange`

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageNodeConverter.java
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.pinot.query.planner.nodes.CalcNode;
+import org.apache.pinot.query.planner.nodes.JoinNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+
+
+/**
+ * The {@code StageNodeConverter} converts a logical {@link RelNode} to a 
{@link StageNode}.
+ */
+public final class StageNodeConverter {

Review comment:
       (nit) suggest renaming to `CalciteLogicalToPinotStageNodeConverter` or 
something similar ?

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.routing.WorkerManager;
+
+
+/**
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest 
of trees with {@link StageNode}.
+ *
+ * This class is non-threadsafe. Do not reuse the stage planner for multiple 
query plans.
+ */
+public class StagePlanner {
+  private final PlannerContext _plannerContext;
+  private final WorkerManager _workerManager;
+
+  private Map<String, StageNode> _queryStageMap;
+  private Map<String, StageMetadata> _stageMetadataMap;
+  private int _stageIdCounter;
+
+  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
+    _plannerContext = plannerContext;
+    _workerManager = workerManager;
+  }
+
+  public QueryPlan makePlan(RelNode relRoot) {
+    // clear the state
+    _queryStageMap = new HashMap<>();
+    _stageMetadataMap = new HashMap<>();
+    _stageIdCounter = 0;
+
+    // walk the plan and create stages.
+    StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+
+    // global root needs to send results back to the ROOT, a.k.a. the client 
response node.
+    // the last stage is always a broadcast-gather.
+    StageNode globalReceiverNode =
+        new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, 
globalReceiverNode.getStageId(),
+        RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
+    StageMetadata stageMetadata = 
_stageMetadataMap.get(globalSenderNode.getStageId());
+    stageMetadata.attach(globalSenderNode);
+
+    _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
+    StageMetadata globalReceivingStageMetadata = new StageMetadata();
+    globalReceivingStageMetadata.attach(globalReceiverNode);
+    _stageMetadataMap.put(globalReceiverNode.getStageId(), 
globalReceivingStageMetadata);
+
+    // assign workers to each stage.
+    for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+    }
+
+    return new QueryPlan(_queryStageMap, _stageMetadataMap);
+  }
+
+  // non-threadsafe
+  private StageNode walkRelPlan(RelNode node, String currentStageId) {
+    if (isExchangeNode(node)) {
+      // 1. exchangeNode always have only one input, get its input converted 
as a new stage root.
+      StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
+      RelDistribution.Type exchangeType = ((LogicalExchange) 
node).distribution.getType();
+
+      // 2. make an exchange sender and receiver node pair
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getStageId(), exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot, 
mailboxReceiver.getStageId(), exchangeType);
+
+      // 3. put the sender side as a completed stage.
+      _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
+
+      // 4. return the receiver (this is considered as a "virtual table scan" 
node for its parent.
+      return mailboxReceiver;
+    } else {
+      StageNode stageNode = StageNodeConverter.toStageNode(node, 
currentStageId);
+      List<RelNode> inputs = node.getInputs();
+      for (RelNode input : inputs) {
+        stageNode.addInput(walkRelPlan(input, currentStageId));
+      }
+      StageMetadata stageMetadata = 
_stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata());
+      stageMetadata.attach(stageNode);
+      return stageNode;
+    }
+  }
+
+  private boolean isExchangeNode(RelNode node) {
+    return (node instanceof LogicalExchange);
+  }
+
+  private String getNewStageId() {
+    return String.valueOf(_stageIdCounter++);

Review comment:
       I suggest using INT as stageId to play nice with heap usage of such 
StageNode objects esp because the code is converting counter to string

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.routing.WorkerManager;
+
+
+/**
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest 
of trees with {@link StageNode}.
+ *
+ * This class is non-threadsafe. Do not reuse the stage planner for multiple 
query plans.
+ */
+public class StagePlanner {
+  private final PlannerContext _plannerContext;
+  private final WorkerManager _workerManager;
+
+  private Map<String, StageNode> _queryStageMap;
+  private Map<String, StageMetadata> _stageMetadataMap;
+  private int _stageIdCounter;
+
+  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
+    _plannerContext = plannerContext;
+    _workerManager = workerManager;
+  }
+
+  public QueryPlan makePlan(RelNode relRoot) {
+    // clear the state
+    _queryStageMap = new HashMap<>();
+    _stageMetadataMap = new HashMap<>();
+    _stageIdCounter = 0;
+
+    // walk the plan and create stages.
+    StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+
+    // global root needs to send results back to the ROOT, a.k.a. the client 
response node.
+    // the last stage is always a broadcast-gather.
+    StageNode globalReceiverNode =
+        new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, 
globalReceiverNode.getStageId(),
+        RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
+    StageMetadata stageMetadata = 
_stageMetadataMap.get(globalSenderNode.getStageId());
+    stageMetadata.attach(globalSenderNode);
+
+    _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
+    StageMetadata globalReceivingStageMetadata = new StageMetadata();
+    globalReceivingStageMetadata.attach(globalReceiverNode);
+    _stageMetadataMap.put(globalReceiverNode.getStageId(), 
globalReceivingStageMetadata);
+
+    // assign workers to each stage.
+    for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+    }
+
+    return new QueryPlan(_queryStageMap, _stageMetadataMap);
+  }
+
+  // non-threadsafe
+  private StageNode walkRelPlan(RelNode node, String currentStageId) {
+    if (isExchangeNode(node)) {
+      // 1. exchangeNode always have only one input, get its input converted 
as a new stage root.
+      StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
+      RelDistribution.Type exchangeType = ((LogicalExchange) 
node).distribution.getType();
+
+      // 2. make an exchange sender and receiver node pair
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getStageId(), exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot, 
mailboxReceiver.getStageId(), exchangeType);
+
+      // 3. put the sender side as a completed stage.
+      _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
+
+      // 4. return the receiver (this is considered as a "virtual table scan" 
node for its parent.
+      return mailboxReceiver;
+    } else {
+      StageNode stageNode = StageNodeConverter.toStageNode(node, 
currentStageId);
+      List<RelNode> inputs = node.getInputs();
+      for (RelNode input : inputs) {
+        stageNode.addInput(walkRelPlan(input, currentStageId));
+      }
+      StageMetadata stageMetadata = 
_stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata());
+      stageMetadata.attach(stageNode);
+      return stageNode;
+    }
+  }
+
+  private boolean isExchangeNode(RelNode node) {
+    return (node instanceof LogicalExchange);
+  }
+
+  private String getNewStageId() {
+    return String.valueOf(_stageIdCounter++);

Review comment:
       I suggest using INT as stageId to play nice with heap usage of such 
StageNode objects esp because the code is converting counter to string so might 
as well use INT

##########
File path: 
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.routing.WorkerManager;
+
+
+/**
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest 
of trees with {@link StageNode}.
+ *
+ * This class is non-threadsafe. Do not reuse the stage planner for multiple 
query plans.
+ */
+public class StagePlanner {
+  private final PlannerContext _plannerContext;
+  private final WorkerManager _workerManager;
+
+  private Map<String, StageNode> _queryStageMap;
+  private Map<String, StageMetadata> _stageMetadataMap;
+  private int _stageIdCounter;
+
+  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
+    _plannerContext = plannerContext;
+    _workerManager = workerManager;
+  }
+
+  public QueryPlan makePlan(RelNode relRoot) {
+    // clear the state
+    _queryStageMap = new HashMap<>();
+    _stageMetadataMap = new HashMap<>();
+    _stageIdCounter = 0;
+
+    // walk the plan and create stages.
+    StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+
+    // global root needs to send results back to the ROOT, a.k.a. the client 
response node.
+    // the last stage is always a broadcast-gather.
+    StageNode globalReceiverNode =
+        new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, 
globalReceiverNode.getStageId(),
+        RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
+    StageMetadata stageMetadata = 
_stageMetadataMap.get(globalSenderNode.getStageId());
+    stageMetadata.attach(globalSenderNode);
+
+    _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
+    StageMetadata globalReceivingStageMetadata = new StageMetadata();
+    globalReceivingStageMetadata.attach(globalReceiverNode);
+    _stageMetadataMap.put(globalReceiverNode.getStageId(), 
globalReceivingStageMetadata);
+
+    // assign workers to each stage.
+    for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+    }
+
+    return new QueryPlan(_queryStageMap, _stageMetadataMap);
+  }
+
+  // non-threadsafe

Review comment:
       I had a thread safety related question on QueryEnvironment. If that 
class is instantiated per compiled query, then it implies calls to StagePlanner 
should be thread safe ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to