This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 8c8380005b6d833ee9d20ad83487aac1439e0019
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Mar 24 21:13:51 2022 -0700

    Add pinot-query-planner module (#8340)
    
    * add pinot-query-planner
    
    - fix calcite upgrade compilation issue
    - fix query compilation runtime after calcite 1.29 upgrade
    - linter
    
    * address diff comments and add more TODOs
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/common/config/provider/TableCache.java   |  10 +
 pinot-query-planner/pom.xml                        |  92 ++++
 .../apache/calcite/jdbc/CalciteSchemaBuilder.java  |  52 +++
 .../org/apache/pinot/query/QueryEnvironment.java   | 181 ++++++++
 .../apache/pinot/query/catalog/PinotCatalog.java   | 122 +++++
 .../org/apache/pinot/query/catalog/PinotTable.java |  61 +++
 .../apache/pinot/query/context/PlannerContext.java |  40 ++
 .../query/parser/CalciteExpressionParser.java      | 502 +++++++++++++++++++++
 .../pinot/query/parser/CalciteSqlParser.java       | 148 ++++++
 .../org/apache/pinot/query/parser/ParserUtils.java |  63 +++
 .../apache/pinot/query/parser/QueryRewriter.java   |  46 ++
 .../apache/pinot/query/planner/LogicalPlanner.java |  63 +++
 .../org/apache/pinot/query/planner/QueryPlan.java  |  60 +++
 .../pinot/query/planner/RelToStageConverter.java   |  71 +++
 .../apache/pinot/query/planner/StageMetadata.java  |  85 ++++
 .../apache/pinot/query/planner/StagePlanner.java   | 126 ++++++
 .../query/planner/nodes/AbstractStageNode.java     |  49 ++
 .../apache/pinot/query/planner/nodes/CalcNode.java |  40 ++
 .../apache/pinot/query/planner/nodes/JoinNode.java |  86 ++++
 .../query/planner/nodes/MailboxReceiveNode.java    |  54 +++
 .../pinot/query/planner/nodes/MailboxSendNode.java |  55 +++
 .../pinot/query/planner/nodes/StageNode.java       |  40 ++
 .../pinot/query/planner/nodes/TableScanNode.java   |  57 +++
 .../partitioning/FieldSelectionKeySelector.java    |  39 ++
 .../query/planner/partitioning/KeySelector.java    |  37 ++
 .../apache/pinot/query/routing/WorkerInstance.java |  51 +++
 .../apache/pinot/query/routing/WorkerManager.java  |  95 ++++
 .../query/rules/PinotExchangeNodeInsertRule.java   |  82 ++++
 .../pinot/query/rules/PinotQueryRuleSets.java      |  91 ++++
 .../org/apache/pinot/query/type/TypeFactory.java   |  84 ++++
 .../org/apache/pinot/query/type/TypeSystem.java    |  30 ++
 .../org/apache/pinot/query/validate/Validator.java |  36 ++
 .../apache/pinot/query/QueryEnvironmentTest.java   | 122 +++++
 .../pinot/query/QueryEnvironmentTestUtils.java     | 131 ++++++
 pom.xml                                            |   6 +-
 35 files changed, 2903 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 17fbf73cd3..005fd65615 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -227,6 +227,16 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
+  /**
+   * Return a map between lower-case table name and their canonicalized form. 
Key-value pair are only different in
+   * case-sensitive environment.
+   *
+   * @return the table name map.
+   */
+  public Map<String, String> getTableNameMap() {
+    return _tableNameMap;
+  }
+
   private void addTableConfigs(List<String> paths) {
     // Subscribe data changes before reading the data to avoid missing changes
     for (String path : paths) {
diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml
new file mode 100644
index 0000000000..8d1af64a19
--- /dev/null
+++ b/pinot-query-planner/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>pinot</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>pinot-query-planner</artifactId>
+  <name>Pinot Query Planner</name>
+  <url>https://pinot.apache.org/</url>
+
+  <properties>
+    <pinot.root>${basedir}/..</pinot.root>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Pinot dependencies -->
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+    </dependency>
+
+    <!-- Calcite dependencies -->
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>3.0.9</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>commons-compiler</artifactId>
+      <version>3.0.9</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
new file mode 100644
index 0000000000..ce3d1c99f9
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/jdbc/CalciteSchemaBuilder.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.jdbc;
+
+import org.apache.calcite.schema.Schema;
+
+
+/**
+ * This class is used to create a {@link CalciteSchema} with a given {@link 
Schema} as the root.
+ *
+ * <p>This class resides in calcite.jdbc namespace because there's no complex 
logic we have in terms of catalog-based
+ * schema construct. We instead create a {@link SimpleCalciteSchema} that's 
package protected.
+ */
+public class CalciteSchemaBuilder {
+
+  private CalciteSchemaBuilder() {
+    // do not instantiate.
+  }
+
+  /**
+   * Creates a {@link CalciteSchema} with a given {@link Schema} as the root.
+   *
+   * <p>Calcite creates two layer of abstraction, the {@link CalciteSchema} is 
use internally for planner and
+   * {@link Schema} is user-facing with overrides. In our case we don't have a 
complex internal wrapper extension
+   * so we only reuse the package protected {@link SimpleCalciteSchema}.
+   *
+   * <p>If there's need to extend this feature for planner functionalities we 
should create our own extension to the
+   * {@link CalciteSchema}.
+   *
+   * @param root schema to use as a root schema
+   * @return calcite schema with given schema as the root
+   */
+  public static CalciteSchema asRootSchema(Schema root) {
+    return new SimpleCalciteSchema(null, root, "");
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
new file mode 100644
index 0000000000..215c0051d7
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import java.util.Collection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.prepare.PlannerImpl;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.sql2rel.StandardConvertletTable;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.parser.CalciteSqlParser;
+import org.apache.pinot.query.planner.LogicalPlanner;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StagePlanner;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.rules.PinotQueryRuleSets;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.validate.Validator;
+
+
+/**
+ * The {@code QueryEnvironment} contains the main entrypoint for query 
planning.
+ *
+ * <p>It provide the higher level entry interface to convert a SQL string into 
a {@link QueryPlan}.
+ */
+public class QueryEnvironment {
+  // Calcite configurations
+  private final FrameworkConfig _config;
+
+  // Calcite extension/plugins
+  private final CalciteSchema _rootSchema;
+  private final PlannerImpl _planner;
+  private final Prepare.CatalogReader _catalogReader;
+  private final RelDataTypeFactory _typeFactory;
+  private final RelOptPlanner _relOptPlanner;
+  private final SqlValidator _validator;
+
+  // Pinot extensions
+  private final Collection<RelOptRule> _logicalRuleSet;
+  private final WorkerManager _workerManager;
+
+  public QueryEnvironment(TypeFactory typeFactory, CalciteSchema rootSchema, 
WorkerManager workerManager) {
+    _typeFactory = typeFactory;
+    _rootSchema = rootSchema;
+    _workerManager = workerManager;
+    _config = Frameworks.newConfigBuilder().traitDefs().build();
+
+    // Planner is not thread-safe. must be reset() after each use.
+    _planner = new PlannerImpl(_config);
+
+    // catalog
+    _catalogReader = new CalciteCatalogReader(_rootSchema, 
_rootSchema.path(null), _typeFactory, null);
+    _validator = new Validator(SqlStdOperatorTable.instance(), _catalogReader, 
_typeFactory);
+
+    // optimizer rules
+    _logicalRuleSet = PinotQueryRuleSets.LOGICAL_OPT_RULES;
+
+    // optimizer
+    HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+    for (RelOptRule relOptRule : _logicalRuleSet) {
+      hepProgramBuilder.addRuleInstance(relOptRule);
+    }
+    _relOptPlanner = new LogicalPlanner(hepProgramBuilder.build(), 
Contexts.EMPTY_CONTEXT);
+  }
+
+  /**
+   * Plan a SQL query.
+   *
+   * <p>Noted that since Calcite's {@link org.apache.calcite.tools.Planner} is 
not threadsafe.
+   * Only one query query can be planned at a time. Afterwards planner needs 
to be reset in order to clear the
+   * state for the next planning.
+   *
+   * <p>In order for faster planning, we pre-constructed all the planner 
objects and use the plan-then-reset
+   * model. Thusn when using {@code QueryEnvironment#planQuery(String)}, 
caller should ensure that no concurrent
+   * plan execution occurs.
+   *
+   * TODO: follow benchmark and profile to measure whether it make sense for 
the latency-concurrency trade-off
+   * between reusing plannerImpl vs. create a new planner for each query.
+   *
+   * @param sqlQuery SQL query string.
+   * @return a dispatchable query plan
+   */
+  public QueryPlan planQuery(String sqlQuery) {
+    PlannerContext plannerContext = new PlannerContext();
+    try {
+      SqlNode parsed = parse(sqlQuery, plannerContext);
+      SqlNode validated = validate(parsed);
+      RelRoot relation = toRelation(validated, plannerContext);
+      RelNode optimized = optimize(relation, plannerContext);
+      return toDispatchablePlan(optimized, plannerContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Error composing query plan for: " + 
sqlQuery, e);
+    } finally {
+      _planner.close();
+      _planner.reset();
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // steps
+  // --------------------------------------------------------------------------
+
+  protected SqlNode parse(String query, PlannerContext plannerContext)
+      throws Exception {
+    // 1. invoke CalciteSqlParser to parse out SqlNode;
+    return CalciteSqlParser.compile(query, plannerContext);
+  }
+
+  protected SqlNode validate(SqlNode parsed)
+      throws Exception {
+    // 2. validator to validate.
+    SqlNode validated = _validator.validate(parsed);
+    if (null == validated || !validated.getKind().belongsTo(SqlKind.QUERY)) {
+      throw new IllegalArgumentException(
+          String.format("unsupported SQL query, cannot validate out a valid 
sql from:\n%s", parsed));
+    }
+    return validated;
+  }
+
+  protected RelRoot toRelation(SqlNode parsed, PlannerContext plannerContext) {
+    // 3. convert sqlNode to relNode.
+    RexBuilder rexBuilder = new RexBuilder(_typeFactory);
+    RelOptCluster cluster = RelOptCluster.create(_relOptPlanner, rexBuilder);
+    SqlToRelConverter sqlToRelConverter =
+        new SqlToRelConverter(_planner, _validator, _catalogReader, cluster, 
StandardConvertletTable.INSTANCE,
+            SqlToRelConverter.config());
+    return sqlToRelConverter.convertQuery(parsed, false, true);
+  }
+
+  protected RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
+    // 4. optimize relNode
+    // TODO: add support for traits, cost factory.
+    try {
+      _relOptPlanner.setRoot(relRoot.rel);
+      return _relOptPlanner.findBestExp();
+    } catch (Exception e) {
+      throw new UnsupportedOperationException(
+          "Cannot generate a valid execution plan for the given query: " + 
RelOptUtil.toString(relRoot.rel), e);
+    }
+  }
+
+  protected QueryPlan toDispatchablePlan(RelNode relRoot, PlannerContext 
plannerContext) {
+    // 5. construct a dispatchable query plan.
+    StagePlanner queryStagePlanner = new StagePlanner(plannerContext, 
_workerManager);
+    return queryStagePlanner.makePlan(relRoot);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
new file mode 100644
index 0000000000..34673a274b
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.catalog;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * Simple Catalog that only contains list of tables. Backed by {@link 
TableCache}.
+ *
+ * <p>Catalog is needed for utilizing Apache Calcite's validator, which 
requires a root schema to store the
+ * entire catalog. In Pinot, since we don't have nested sub-catalog concept, 
we just return a flat list of schemas.
+ */
+public class PinotCatalog implements Schema {
+
+  private final TableCache _tableCache;
+
+  /**
+   * PinotCatalog needs have access to the actual {@link TableCache} object 
because TableCache hosts the actual
+   * table available for query and processes table/segment metadata updates 
when cluster status changes.
+   */
+  public PinotCatalog(TableCache tableCache) {
+    _tableCache = tableCache;
+  }
+
+  /**
+   * Acquire a table by its name.
+   * @param name name of the table.
+   * @return table object used by calcite planner.
+   */
+  @Override
+  public Table getTable(String name) {
+    String tableName = TableNameBuilder.extractRawTableName(name);
+    return new PinotTable(_tableCache.getSchema(tableName));
+  }
+
+  /**
+   * acquire a set of available table names.
+   * @return the set of table names at the time of query planning.
+   */
+  @Override
+  public Set<String> getTableNames() {
+    return _tableCache.getTableNameMap().keySet();
+  }
+
+  @Override
+  public RelProtoDataType getType(String name) {
+    return null;
+  }
+
+  @Override
+  public Set<String> getTypeNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Collection<Function> getFunctions(String name) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public Set<String> getFunctionNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Schema getSubSchema(String name) {
+    return null;
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Expression getExpression(@Nullable SchemaPlus parentSchema, String 
name) {
+    requireNonNull(parentSchema, "parentSchema");
+    return Schemas.subSchemaExpression(parentSchema, name, getClass());
+  }
+
+  @Override
+  public boolean isMutable() {
+    return false;
+  }
+
+  @Override
+  public Schema snapshot(SchemaVersion version) {
+    return this;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
new file mode 100644
index 0000000000..23e6444e90
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.catalog;
+
+import com.clearspring.analytics.util.Preconditions;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Wrapper for pinot internal info for a table.
+ *
+ * <p>This construct is used to connect a Pinot table to Apache Calcite's 
relational planner by providing a
+ * {@link RelDataType} of the table to the planner.
+ */
+public class PinotTable extends AbstractTable implements ScannableTable {
+  private Schema _schema;
+
+  public PinotTable(Schema schema) {
+    _schema = schema;
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+    Preconditions.checkState(relDataTypeFactory instanceof TypeFactory);
+    TypeFactory typeFactory = (TypeFactory) relDataTypeFactory;
+    return typeFactory.createRelDataTypeFromSchema(_schema);
+  }
+
+  @Override
+  public boolean isRolledUp(String s) {
+    return false;
+  }
+
+  @Override
+  public Enumerable<Object[]> scan(DataContext dataContext) {
+    return null;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
new file mode 100644
index 0000000000..1997d1f370
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.context;
+
+import java.util.Map;
+
+
+/**
+ * PlannerContext is an object that holds all contextual information during 
planning phase.
+ *
+ * TODO: currently the planner context is not used since we don't support 
option or query rewrite. This construct is
+ * here as a placeholder for the parsed out options.
+ */
+public class PlannerContext {
+  private Map<String, String> _options;
+
+  public void setOptions(Map<String, String> options) {
+    _options = options;
+  }
+
+  public Map<String, String> getOptions() {
+    return _options;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
new file mode 100644
index 0000000000..fc75efb1ae
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Calcite parser to convert SQL expressions into {@link Expression}.
+ *
+ * <p>This class is extracted from {@link 
org.apache.pinot.sql.parsers.CalciteSqlParser}. It only contains the
+ * {@link Expression} related info, this is used for ingestion and query 
rewrite.
+ */
+public class CalciteExpressionParser {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CalciteExpressionParser.class);
+
+  private CalciteExpressionParser() {
+    // do not instantiate.
+  }
+
+  private static List<Expression> 
getAliasLeftExpressionsFromDistinctExpression(Function function) {
+    List<Expression> operands = function.getOperands();
+    List<Expression> expressions = new ArrayList<>(operands.size());
+    for (Expression operand : operands) {
+      if (isAsFunction(operand)) {
+        expressions.add(operand.getFunctionCall().getOperands().get(0));
+      } else {
+        expressions.add(operand);
+      }
+    }
+    return expressions;
+  }
+
+  public static boolean isAggregateExpression(Expression expression) {
+    Function functionCall = expression.getFunctionCall();
+    if (functionCall != null) {
+      String operator = functionCall.getOperator();
+      try {
+        AggregationFunctionType.getAggregationFunctionType(operator);
+        return true;
+      } catch (IllegalArgumentException e) {
+      }
+      if (functionCall.getOperandsSize() > 0) {
+        for (Expression operand : functionCall.getOperands()) {
+          if (isAggregateExpression(operand)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  public static boolean isAsFunction(Expression expression) {
+    return expression.getFunctionCall() != null && 
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS");
+  }
+
+  /**
+   * Extract all the identifiers from given expressions.
+   *
+   * @param expressions
+   * @param excludeAs if true, ignores the right side identifier for AS 
function.
+   * @return all the identifier names.
+   */
+  public static Set<String> extractIdentifiers(List<Expression> expressions, 
boolean excludeAs) {
+    Set<String> identifiers = new HashSet<>();
+    for (Expression expression : expressions) {
+      if (expression.getIdentifier() != null) {
+        identifiers.add(expression.getIdentifier().getName());
+      } else if (expression.getFunctionCall() != null) {
+        if (excludeAs && 
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+          identifiers.addAll(
+              
extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)),
 true));
+          continue;
+        } else {
+          
identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(),
 excludeAs));
+        }
+      }
+    }
+    return identifiers;
+  }
+
+  /**
+   * Compiles a String expression into {@link Expression}.
+   *
+   * @param expression String expression.
+   * @return {@link Expression} equivalent of the string.
+   *
+   * @throws SqlCompilationException if String is not a valid expression.
+   */
+  public static Expression compileToExpression(String expression) {
+    SqlParser sqlParser = SqlParser.create(expression, 
ParserUtils.PARSER_CONFIG);
+    SqlNode sqlNode;
+    try {
+      sqlNode = sqlParser.parseExpression();
+    } catch (SqlParseException e) {
+      throw new SqlCompilationException("Caught exception while parsing 
expression: " + expression, e);
+    }
+    return toExpression(sqlNode);
+  }
+
+  private static List<Expression> convertDistinctSelectList(SqlNodeList 
selectList) {
+    List<Expression> selectExpr = new ArrayList<>();
+    
selectExpr.add(convertDistinctAndSelectListToFunctionExpression(selectList));
+    return selectExpr;
+  }
+
+  private static List<Expression> convertSelectList(SqlNodeList selectList) {
+    List<Expression> selectExpr = new ArrayList<>();
+
+    final Iterator<SqlNode> iterator = selectList.iterator();
+    while (iterator.hasNext()) {
+      final SqlNode next = iterator.next();
+      selectExpr.add(toExpression(next));
+    }
+
+    return selectExpr;
+  }
+
+  private static List<Expression> convertOrderByList(SqlNodeList orderList) {
+    List<Expression> orderByExpr = new ArrayList<>();
+    final Iterator<SqlNode> iterator = orderList.iterator();
+    while (iterator.hasNext()) {
+      final SqlNode next = iterator.next();
+      orderByExpr.add(convertOrderBy(next));
+    }
+    return orderByExpr;
+  }
+
+  private static Expression convertOrderBy(SqlNode node) {
+    final SqlKind kind = node.getKind();
+    Expression expression;
+    switch (kind) {
+      case DESCENDING:
+        SqlBasicCall basicCall = (SqlBasicCall) node;
+        expression = RequestUtils.getFunctionExpression("DESC");
+        
expression.getFunctionCall().addToOperands(toExpression(basicCall.getOperandList().get(0)));
+        break;
+      case IDENTIFIER:
+      default:
+        expression = RequestUtils.getFunctionExpression("ASC");
+        expression.getFunctionCall().addToOperands(toExpression(node));
+        break;
+    }
+    return expression;
+  }
+
+  /**
+   * DISTINCT is implemented as an aggregation function so need to take the 
select list items
+   * and convert them into a single function expression for handing over to 
execution engine
+   * either as a PinotQuery or BrokerRequest via conversion
+   * @param selectList select list items
+   * @return DISTINCT function expression
+   */
+  private static Expression 
convertDistinctAndSelectListToFunctionExpression(SqlNodeList selectList) {
+    String functionName = AggregationFunctionType.DISTINCT.getName();
+    Expression functionExpression = 
RequestUtils.getFunctionExpression(functionName);
+    for (SqlNode node : selectList) {
+      Expression columnExpression = toExpression(node);
+      if (columnExpression.getType() == ExpressionType.IDENTIFIER && 
columnExpression.getIdentifier().getName()
+          .equals("*")) {
+        throw new SqlCompilationException(
+            "Syntax error: Pinot currently does not support DISTINCT with *. 
Please specify each column name after "
+                + "DISTINCT keyword");
+      } else if (columnExpression.getType() == ExpressionType.FUNCTION) {
+        Function functionCall = columnExpression.getFunctionCall();
+        String function = functionCall.getOperator();
+        if (AggregationFunctionType.isAggregationFunction(function)) {
+          throw new SqlCompilationException(
+              "Syntax error: Use of DISTINCT with aggregation functions is not 
supported");
+        }
+      }
+      functionExpression.getFunctionCall().addToOperands(columnExpression);
+    }
+    return functionExpression;
+  }
+
+  private static Expression toExpression(SqlNode node) {
+    LOGGER.debug("Current processing SqlNode: {}, node.getKind(): {}", node, 
node.getKind());
+    switch (node.getKind()) {
+      case IDENTIFIER:
+        if (((SqlIdentifier) node).isStar()) {
+          return RequestUtils.getIdentifierExpression("*");
+        }
+        if (((SqlIdentifier) node).isSimple()) {
+          return RequestUtils.getIdentifierExpression(((SqlIdentifier) 
node).getSimple());
+        }
+        return RequestUtils.getIdentifierExpression(node.toString());
+      case LITERAL:
+        return RequestUtils.getLiteralExpression((SqlLiteral) node);
+      case AS:
+        SqlBasicCall asFuncSqlNode = (SqlBasicCall) node;
+        List<SqlNode> operands = asFuncSqlNode.getOperandList();
+        Expression leftExpr = toExpression(operands.get(0));
+        SqlNode aliasSqlNode = operands.get(1);
+        String aliasName;
+        switch (aliasSqlNode.getKind()) {
+          case IDENTIFIER:
+            aliasName = ((SqlIdentifier) aliasSqlNode).getSimple();
+            break;
+          case LITERAL:
+            aliasName = ((SqlLiteral) aliasSqlNode).toValue();
+            break;
+          default:
+            throw new SqlCompilationException("Unsupported Alias sql node - " 
+ aliasSqlNode);
+        }
+        Expression rightExpr = RequestUtils.getIdentifierExpression(aliasName);
+        // Just return left identifier if both sides are the same identifier.
+        if (leftExpr.isSetIdentifier() && rightExpr.isSetIdentifier()) {
+          if 
(leftExpr.getIdentifier().getName().equals(rightExpr.getIdentifier().getName()))
 {
+            return leftExpr;
+          }
+        }
+        final Expression asFuncExpr = 
RequestUtils.getFunctionExpression(SqlKind.AS.toString());
+        asFuncExpr.getFunctionCall().addToOperands(leftExpr);
+        asFuncExpr.getFunctionCall().addToOperands(rightExpr);
+        return asFuncExpr;
+      case CASE:
+        // CASE WHEN Statement is model as a function with variable length 
parameters.
+        // Assume N is number of WHEN Statements, total number of parameters 
is (2 * N + 1).
+        // - N: Convert each WHEN Statement into a function Expression;
+        // - N: Convert each THEN Statement into an Expression;
+        // - 1: Convert ELSE Statement into an Expression.
+        SqlCase caseSqlNode = (SqlCase) node;
+        SqlNodeList whenOperands = caseSqlNode.getWhenOperands();
+        SqlNodeList thenOperands = caseSqlNode.getThenOperands();
+        SqlNode elseOperand = caseSqlNode.getElseOperand();
+        Expression caseFuncExpr = 
RequestUtils.getFunctionExpression(SqlKind.CASE.name());
+        for (SqlNode whenSqlNode : whenOperands.getList()) {
+          Expression whenExpression = toExpression(whenSqlNode);
+          if (isAggregateExpression(whenExpression)) {
+            throw new SqlCompilationException(
+                "Aggregation functions inside WHEN Clause is not supported - " 
+ whenSqlNode);
+          }
+          caseFuncExpr.getFunctionCall().addToOperands(whenExpression);
+        }
+        for (SqlNode thenSqlNode : thenOperands.getList()) {
+          Expression thenExpression = toExpression(thenSqlNode);
+          if (isAggregateExpression(thenExpression)) {
+            throw new SqlCompilationException(
+                "Aggregation functions inside THEN Clause is not supported - " 
+ thenSqlNode);
+          }
+          caseFuncExpr.getFunctionCall().addToOperands(thenExpression);
+        }
+        Expression elseExpression = toExpression(elseOperand);
+        if (isAggregateExpression(elseExpression)) {
+          throw new SqlCompilationException(
+              "Aggregation functions inside ELSE Clause is not supported - " + 
elseExpression);
+        }
+        caseFuncExpr.getFunctionCall().addToOperands(elseExpression);
+        return caseFuncExpr;
+      default:
+        if (node instanceof SqlDataTypeSpec) {
+          // This is to handle expression like: CAST(col AS INT)
+          return RequestUtils.getLiteralExpression(((SqlDataTypeSpec) 
node).getTypeName().getSimple());
+        } else {
+          return compileFunctionExpression((SqlBasicCall) node);
+        }
+    }
+  }
+
+  private static Expression compileFunctionExpression(SqlBasicCall 
functionNode) {
+    SqlKind functionKind = functionNode.getKind();
+    String functionName;
+    switch (functionKind) {
+      case AND:
+        return compileAndExpression(functionNode);
+      case OR:
+        return compileOrExpression(functionNode);
+      case COUNT:
+        SqlLiteral functionQuantifier = functionNode.getFunctionQuantifier();
+        if (functionQuantifier != null && 
functionQuantifier.toValue().equalsIgnoreCase("DISTINCT")) {
+          functionName = AggregationFunctionType.DISTINCTCOUNT.name();
+        } else {
+          functionName = AggregationFunctionType.COUNT.name();
+        }
+        break;
+      case OTHER:
+      case OTHER_FUNCTION:
+      case DOT:
+        functionName = functionNode.getOperator().getName().toUpperCase();
+        if (functionName.equals("ITEM") || functionName.equals("DOT")) {
+          // Calcite parses path expression such as "data[0][1].a.b[0]" into a 
chain of ITEM and/or DOT
+          // functions. Collapse this chain into an identifier.
+          StringBuffer path = new StringBuffer();
+          compilePathExpression(functionName, functionNode, path);
+          return RequestUtils.getIdentifierExpression(path.toString());
+        }
+        break;
+      default:
+        functionName = functionKind.name();
+        break;
+    }
+    // When there is no argument, set an empty list as the operands
+    List<SqlNode> childNodes = functionNode.getOperandList();
+    List<Expression> operands = new ArrayList<>(childNodes.size());
+    for (SqlNode childNode : childNodes) {
+      if (childNode instanceof SqlNodeList) {
+        for (SqlNode node : (SqlNodeList) childNode) {
+          operands.add(toExpression(node));
+        }
+      } else {
+        operands.add(toExpression(childNode));
+      }
+    }
+    validateFunction(functionName, operands);
+    Expression functionExpression = 
RequestUtils.getFunctionExpression(functionName);
+    functionExpression.getFunctionCall().setOperands(operands);
+    return functionExpression;
+  }
+
+  /**
+   * Convert Calcite operator tree made up of ITEM and DOT functions to an 
identifier. For example, the operator tree
+   * shown below will be converted to IDENTIFIER 
"jsoncolumn.data[0][1].a.b[0]".
+   *
+   * ├── ITEM(jsoncolumn.data[0][1].a.b[0])
+   *      ├── LITERAL (0)
+   *      └── DOT (jsoncolumn.daa[0][1].a.b)
+   *            ├── IDENTIFIER (b)
+   *            └── DOT (jsoncolumn.data[0][1].a)
+   *                  ├── IDENTIFIER (a)
+   *                  └── ITEM (jsoncolumn.data[0][1])
+   *                        ├── LITERAL (1)
+   *                        └── ITEM (jsoncolumn.data[0])
+   *                              ├── LITERAL (1)
+   *                              └── IDENTIFIER (jsoncolumn.data)
+   *
+   * @param functionName Name of the function ("DOT" or "ITEM")
+   * @param functionNode Root node of the DOT and/or ITEM operator function 
chain.
+   * @param path String representation of path represented by DOT and/or ITEM 
function chain.
+   */
+  private static void compilePathExpression(String functionName, SqlBasicCall 
functionNode, StringBuffer path) {
+    List<SqlNode> operands = functionNode.getOperandList();
+
+    // Compile first operand of the function (either an identifier or another 
DOT and/or ITEM function).
+    SqlKind kind0 = operands.get(0).getKind();
+    if (kind0 == SqlKind.IDENTIFIER) {
+      path.append(operands.get(0).toString());
+    } else if (kind0 == SqlKind.DOT || kind0 == SqlKind.OTHER_FUNCTION) {
+      SqlBasicCall function0 = (SqlBasicCall) operands.get(0);
+      String name0 = function0.getOperator().getName();
+      if (name0.equals("ITEM") || name0.equals("DOT")) {
+        compilePathExpression(name0, function0, path);
+      } else {
+        throw new SqlCompilationException("SELECT list item has bad path 
expression.");
+      }
+    } else {
+      throw new SqlCompilationException("SELECT list item has bad path 
expression.");
+    }
+
+    // Compile second operand of the function (either an identifier or 
literal).
+    SqlKind kind1 = operands.get(1).getKind();
+    if (kind1 == SqlKind.IDENTIFIER) {
+      path.append(".").append(((SqlIdentifier) operands.get(1)).getSimple());
+    } else if (kind1 == SqlKind.LITERAL) {
+      path.append("[").append(((SqlLiteral) 
operands.get(1)).toValue()).append("]");
+    } else {
+      throw new SqlCompilationException("SELECT list item has bad path 
expression.");
+    }
+  }
+
+  public static String canonicalize(String functionName) {
+    return StringUtils.remove(functionName, '_').toLowerCase();
+  }
+
+  public static boolean isSameFunction(String function1, String function2) {
+    return canonicalize(function1).equals(canonicalize(function2));
+  }
+
+  private static void validateFunction(String functionName, List<Expression> 
operands) {
+    switch (canonicalize(functionName)) {
+      case "jsonextractscalar":
+        validateJsonExtractScalarFunction(operands);
+        break;
+      case "jsonextractkey":
+        validateJsonExtractKeyFunction(operands);
+        break;
+      default:
+        break;
+    }
+  }
+
+  private static void validateJsonExtractScalarFunction(List<Expression> 
operands) {
+    int numOperands = operands.size();
+
+    // Check that there are exactly 3 or 4 arguments
+    if (numOperands != 3 && numOperands != 4) {
+      throw new SqlCompilationException(
+          "Expect 3 or 4 arguments for transform function: 
jsonExtractScalar(jsonFieldName, 'jsonPath', "
+              + "'resultsType', ['defaultValue'])");
+    }
+    if (!operands.get(1).isSetLiteral() || !operands.get(2).isSetLiteral() || 
(numOperands == 4 && !operands.get(3)
+        .isSetLiteral())) {
+      throw new SqlCompilationException(
+          "Expect the 2nd/3rd/4th argument of transform function: 
jsonExtractScalar(jsonFieldName, 'jsonPath',"
+              + " 'resultsType', ['defaultValue']) to be a single-quoted 
literal value.");
+    }
+  }
+
+  private static void validateJsonExtractKeyFunction(List<Expression> 
operands) {
+    // Check that there are exactly 2 arguments
+    if (operands.size() != 2) {
+      throw new SqlCompilationException(
+          "Expect 2 arguments are required for transform function: 
jsonExtractKey(jsonFieldName, 'jsonPath')");
+    }
+    if (!operands.get(1).isSetLiteral()) {
+      throw new SqlCompilationException(
+          "Expect the 2nd argument for transform function: 
jsonExtractKey(jsonFieldName, 'jsonPath') to be a "
+              + "single-quoted literal value.");
+    }
+  }
+
+  /**
+   * Helper method to flatten the operands for the AND expression.
+   */
+  private static Expression compileAndExpression(SqlBasicCall andNode) {
+    List<Expression> operands = new ArrayList<>();
+    for (SqlNode childNode : andNode.getOperandList()) {
+      if (childNode.getKind() == SqlKind.AND) {
+        Expression childAndExpression = compileAndExpression((SqlBasicCall) 
childNode);
+        operands.addAll(childAndExpression.getFunctionCall().getOperands());
+      } else {
+        operands.add(toExpression(childNode));
+      }
+    }
+    Expression andExpression = 
RequestUtils.getFunctionExpression(SqlKind.AND.name());
+    andExpression.getFunctionCall().setOperands(operands);
+    return andExpression;
+  }
+
+  /**
+   * Helper method to flatten the operands for the OR expression.
+   */
+  private static Expression compileOrExpression(SqlBasicCall orNode) {
+    List<Expression> operands = new ArrayList<>();
+    for (SqlNode childNode : orNode.getOperandList()) {
+      if (childNode.getKind() == SqlKind.OR) {
+        Expression childAndExpression = compileOrExpression((SqlBasicCall) 
childNode);
+        operands.addAll(childAndExpression.getFunctionCall().getOperands());
+      } else {
+        operands.add(toExpression(childNode));
+      }
+    }
+    Expression andExpression = 
RequestUtils.getFunctionExpression(SqlKind.OR.name());
+    andExpression.getFunctionCall().setOperands(operands);
+    return andExpression;
+  }
+
+  public static boolean isLiteralOnlyExpression(Expression e) {
+    if (e.getType() == ExpressionType.LITERAL) {
+      return true;
+    }
+    if (e.getType() == ExpressionType.FUNCTION) {
+      Function functionCall = e.getFunctionCall();
+      if (functionCall.getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
+        return isLiteralOnlyExpression(functionCall.getOperands().get(0));
+      }
+      return false;
+    }
+    return false;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
new file mode 100644
index 0000000000..d67896f9b9
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteSqlParser.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class provide API to parse a SQL string into Pinot query {@link 
SqlNode}.
+ *
+ * <p>This class is extracted from {@link 
org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic
+ * to parsed SQL into {@link SqlNode} and use {@link QueryRewriter} to rewrite 
the query with Pinot specific
+ * contextual info.
+ */
+public class CalciteSqlParser {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CalciteSqlParser.class);
+
+  private CalciteSqlParser() {
+    // do not instantiate.
+  }
+
+  /**
+   * entrypoint for Sql Parser.
+   */
+  public static SqlNode compile(String sql, PlannerContext plannerContext)
+      throws SqlCompilationException {
+    // Extract OPTION statements from sql as Calcite Parser doesn't parse it.
+    // TODO: use parser syntax extension instead.
+    Map<String, String> options = parseOptions(extractOptionsFromSql(sql));
+    plannerContext.setOptions(options);
+    if (!options.isEmpty()) {
+      sql = removeOptionsFromSql(sql);
+    }
+    // Compile Sql without OPTION statements.
+    SqlNode parsed = parse(sql);
+
+    // query rewrite.
+    return QueryRewriter.rewrite(parsed, plannerContext);
+  }
+
+  // ==========================================================================
+  // Static utils to parse the SQL.
+  // ==========================================================================
+
+  private static Map<String, String> parseOptions(List<String> 
optionsStatements) {
+    if (optionsStatements.isEmpty()) {
+      return Collections.emptyMap();
+    }
+    Map<String, String> options = new HashMap<>();
+    for (String optionsStatement : optionsStatements) {
+      for (String option : optionsStatement.split(",")) {
+        final String[] splits = option.split("=");
+        if (splits.length != 2) {
+          throw new SqlCompilationException("OPTION statement requires two 
parts separated by '='");
+        }
+        options.put(splits[0].trim(), splits[1].trim());
+      }
+    }
+    return options;
+  }
+
+  private static SqlNode parse(String sql) {
+    SqlParser sqlParser = SqlParser.create(sql, ParserUtils.PARSER_CONFIG);
+    SqlNode sqlNode;
+    try {
+      sqlNode = sqlParser.parseQuery();
+    } catch (SqlParseException e) {
+      throw new SqlCompilationException("Caught exception while parsing query: 
" + sql, e);
+    }
+
+    // This is a special rewrite,
+    // TODO: move it to planner later.
+    SqlSelect selectNode;
+    if (sqlNode instanceof SqlOrderBy) {
+      // Store order-by info into the select sql node
+      SqlOrderBy orderByNode = (SqlOrderBy) sqlNode;
+      selectNode = (SqlSelect) orderByNode.query;
+      selectNode.setOrderBy(orderByNode.orderList);
+      selectNode.setFetch(orderByNode.fetch);
+      selectNode.setOffset(orderByNode.offset);
+    } else {
+      selectNode = (SqlSelect) sqlNode;
+    }
+    return selectNode;
+  }
+
+  private static List<String> extractOptionsFromSql(String sql) {
+    List<String> results = new ArrayList<>();
+    Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql);
+    while (matcher.find()) {
+      results.add(matcher.group(1));
+    }
+    return results;
+  }
+
+  private static String removeOptionsFromSql(String sql) {
+    Matcher matcher = ParserUtils.OPTIONS_REGEX_PATTEN.matcher(sql);
+    return matcher.replaceAll("");
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
new file mode 100644
index 0000000000..5422382509
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/ParserUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import java.util.regex.Pattern;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.babel.SqlBabelParserImpl;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+
+
+/**
+ * Utility provided to Calcite parser.
+ *
+ * <p>This class is extracted from {@link 
org.apache.pinot.sql.parsers.CalciteSqlParser} for its static constructs.
+ */
+final class ParserUtils {
+  /** Lexical policy similar to MySQL with ANSI_QUOTES option enabled. (To be
+   * precise: MySQL on Windows; MySQL on Linux uses case-sensitive matching,
+   * like the Linux file system.) The case of identifiers is preserved whether
+   * or not they quoted; after which, identifiers are matched
+   * case-insensitively. Double quotes allow identifiers to contain
+   * non-alphanumeric characters. */
+  static final Lex PINOT_LEX = Lex.MYSQL_ANSI;
+
+  // BABEL is a very liberal conformance value that allows anything supported 
by any dialect
+  static final SqlParser.Config PARSER_CONFIG =
+      
SqlParser.configBuilder().setLex(PINOT_LEX).setConformance(SqlConformanceEnum.BABEL)
+          .setParserFactory(SqlBabelParserImpl.FACTORY).build();
+
+  // TODO: move this to use parser syntax extension.
+  // To Keep the backward compatibility with 'OPTION' Functionality in PQL, 
which is used to
+  // provide more hints for query processing.
+  //
+  // PQL syntax is: `OPTION (<key> = <value>)`
+  //
+  // Multiple OPTIONs is also supported by:
+  // either
+  //   `OPTION (<k1> = <v1>, <k2> = <v2>, <k3> = <v3>)`
+  // or
+  //   `OPTION (<k1> = <v1>) OPTION (<k2> = <v2>) OPTION (<k3> = <v3>)`
+  static final Pattern OPTIONS_REGEX_PATTEN = 
Pattern.compile("option\\s*\\(([^\\)]+)\\)", Pattern.CASE_INSENSITIVE);
+
+  private ParserUtils() {
+    // do not instantiate.
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
new file mode 100644
index 0000000000..8cb0060416
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/QueryRewriter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.query.context.PlannerContext;
+
+/**
+ * Rewrites query based on user option as well as built-in rules.
+ */
+public class QueryRewriter {
+
+  private QueryRewriter() {
+    // do not instantiate.
+  }
+
+  /**
+   * Entrypoint to execute the query rewrite.
+   *
+   * It should be functionally identical to running {@link 
org.apache.pinot.sql.parsers.rewriter.QueryRewriter}.
+   * But it operates on a {@link SqlNode} tree instead of a flat pinot query 
object.
+   *
+   * @param sqlNodeRoot root of the sqlNode tree
+   * @param plannerContext planner context
+   * @return rewritten sqlNode.
+   */
+  public static SqlNode rewrite(SqlNode sqlNodeRoot, PlannerContext 
plannerContext) {
+    return sqlNodeRoot;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
new file mode 100644
index 0000000000..9844916490
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/LogicalPlanner.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+
+
+/**
+ * The {@code LogicalPlanner} is an extended implementation of the Calcite's 
{@link HepPlanner}.
+ */
+public class LogicalPlanner extends HepPlanner {
+
+  private List<RelTraitDef> _traitDefs;
+
+  public LogicalPlanner(HepProgram program, Context context) {
+    super(program, context);
+    _traitDefs = new ArrayList();
+  }
+
+  @Override
+  public boolean addRelTraitDef(RelTraitDef relTraitDef) {
+    return !_traitDefs.contains(relTraitDef) && _traitDefs.add(relTraitDef);
+  }
+
+  @Override
+  public List<RelTraitDef> getRelTraitDefs() {
+    return _traitDefs;
+  }
+
+  @Override
+  public RelTraitSet emptyTraitSet() {
+    RelTraitSet traitSet = super.emptyTraitSet();
+    for (RelTraitDef traitDef : _traitDefs) {
+      if (traitDef.multiple()) {
+        // not supported
+      }
+      traitSet = traitSet.plus(traitDef.getDefault());
+    }
+    return traitSet;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
new file mode 100644
index 0000000000..bdaba4ac70
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.Map;
+import org.apache.pinot.query.planner.nodes.StageNode;
+
+
+/**
+ * The {@code QueryPlan} is the dispatchable query execution plan from the 
result of {@link LogicalPlanner}.
+ *
+ * <p>QueryPlan should contain the necessary stage boundary information and 
the cross exchange information
+ * for:
+ * <ul>
+ *   <li>dispatch individual stages to executor.</li>
+ *   <li>instruct stage executor to establish connection channels to other 
stages.</li>
+ *   <li>encode data blocks for transfer between stages based on partitioning 
scheme.</li>
+ * </ul>
+ */
+public class QueryPlan {
+  private Map<String, StageNode> _queryStageMap;
+  private Map<String, StageMetadata> _stageMetadataMap;
+
+  public QueryPlan(Map<String, StageNode> queryStageMap, Map<String, 
StageMetadata> stageMetadataMap) {
+    _queryStageMap = queryStageMap;
+    _stageMetadataMap = stageMetadataMap;
+  }
+
+  /**
+   * Get the map between stageID and the stage plan root node.
+   * @return stage plan map.
+   */
+  public Map<String, StageNode> getQueryStageMap() {
+    return _queryStageMap;
+  }
+
+  /**
+   * Get the stage metadata information.
+   * @return stage metadata info.
+   */
+  public Map<String, StageMetadata> getStageMetadataMap() {
+    return _stageMetadataMap;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
new file mode 100644
index 0000000000..d167521f20
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.pinot.query.planner.nodes.CalcNode;
+import org.apache.pinot.query.planner.nodes.JoinNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+
+
+/**
+ * The {@code StageNodeConverter} converts a logical {@link RelNode} to a 
{@link StageNode}.
+ */
+public final class RelToStageConverter {
+
+  private RelToStageConverter() {
+    // do not instantiate.
+  }
+
+  /**
+   * convert a normal relation node into stage node with just the expression 
piece.
+   *
+   * TODO: we should convert this to a more structured pattern once we 
determine the serialization format used.
+   *
+   * @param node relational node
+   * @return stage node.
+   */
+  public static StageNode toStageNode(RelNode node, String currentStageId) {
+    if (node instanceof LogicalCalc) {
+      return convertLogicalCal((LogicalCalc) node, currentStageId);
+    } else if (node instanceof LogicalTableScan) {
+      return convertLogicalTableScan((LogicalTableScan) node, currentStageId);
+    } else if (node instanceof LogicalJoin) {
+      return convertLogicalJoin((LogicalJoin) node, currentStageId);
+    } else {
+      throw new UnsupportedOperationException("Unsupported logical plan node: 
" + node);
+    }
+  }
+
+  private static StageNode convertLogicalTableScan(LogicalTableScan node, 
String currentStageId) {
+    return new TableScanNode(node, currentStageId);
+  }
+
+  private static StageNode convertLogicalCal(LogicalCalc node, String 
currentStageId) {
+    return new CalcNode(node, currentStageId);
+  }
+
+  private static StageNode convertLogicalJoin(LogicalJoin node, String 
currentStageId) {
+    return new JoinNode(node, currentStageId);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
new file mode 100644
index 0000000000..91eb5b77ca
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.planner.nodes.TableScanNode;
+
+
+/**
+ * The {@code StageMetadata} info contains the information for dispatching a 
particular stage.
+ *
+ * <p>It contains information aboute:
+ * <ul>
+ *   <li>the tables it is suppose to scan for</li>
+ *   <li>the underlying segments a stage requires to execute upon.</li>
+ *   <li>the server instances to which this stage should be execute on</li>
+ * </ul>
+ */
+public class StageMetadata implements Serializable {
+  private List<String> _scannedTables;
+
+  // used for assigning server/worker nodes.
+  private List<ServerInstance> _serverInstances;
+
+  // used for table scan stage.
+  private Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+
+  public StageMetadata() {
+    _scannedTables = new ArrayList<>();
+    _serverInstances = new ArrayList<>();
+    _serverInstanceToSegmentsMap = new HashMap<>();
+  }
+
+  public void attach(StageNode stageNode) {
+    if (stageNode instanceof TableScanNode) {
+      _scannedTables.add(((TableScanNode) stageNode).getTableName().get(0));
+    }
+  }
+
+  public List<String> getScannedTables() {
+    return _scannedTables;
+  }
+
+  // -----------------------------------------------
+  // attached physical plan context.
+  // -----------------------------------------------
+
+  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+    return _serverInstanceToSegmentsMap;
+  }
+
+  public void setServerInstanceToSegmentsMap(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap) {
+    _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
+  }
+
+  public List<ServerInstance> getServerInstances() {
+    return _serverInstances;
+  }
+
+  public void setServerInstances(List<ServerInstance> serverInstances) {
+    _serverInstances = serverInstances;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
new file mode 100644
index 0000000000..175e77f6ef
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.nodes.MailboxReceiveNode;
+import org.apache.pinot.query.planner.nodes.MailboxSendNode;
+import org.apache.pinot.query.planner.nodes.StageNode;
+import org.apache.pinot.query.routing.WorkerManager;
+
+
+/**
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest 
of trees with {@link StageNode}.
+ *
+ * This class is non-threadsafe. Do not reuse the stage planner for multiple 
query plans.
+ */
+public class StagePlanner {
+  private final PlannerContext _plannerContext;
+  private final WorkerManager _workerManager;
+
+  private Map<String, StageNode> _queryStageMap;
+  private Map<String, StageMetadata> _stageMetadataMap;
+  private int _stageIdCounter;
+
+  public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
+    _plannerContext = plannerContext;
+    _workerManager = workerManager;
+  }
+
+  /**
+   * Construct the dispatchable plan from relational logical plan.
+   *
+   * @param relRoot relational plan root.
+   * @return dispatchable plan.
+   */
+  public QueryPlan makePlan(RelNode relRoot) {
+    // clear the state
+    _queryStageMap = new HashMap<>();
+    _stageMetadataMap = new HashMap<>();
+    _stageIdCounter = 0;
+
+    // walk the plan and create stages.
+    StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+
+    // global root needs to send results back to the ROOT, a.k.a. the client 
response node.
+    // the last stage is always a broadcast-gather.
+    StageNode globalReceiverNode =
+        new MailboxReceiveNode("ROOT", globalStageRoot.getStageId(), 
RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot, 
globalReceiverNode.getStageId(),
+        RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
+    StageMetadata stageMetadata = 
_stageMetadataMap.get(globalSenderNode.getStageId());
+    stageMetadata.attach(globalSenderNode);
+
+    _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
+    StageMetadata globalReceivingStageMetadata = new StageMetadata();
+    globalReceivingStageMetadata.attach(globalReceiverNode);
+    _stageMetadataMap.put(globalReceiverNode.getStageId(), 
globalReceivingStageMetadata);
+
+    // assign workers to each stage.
+    for (Map.Entry<String, StageMetadata> e : _stageMetadataMap.entrySet()) {
+      _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
+    }
+
+    return new QueryPlan(_queryStageMap, _stageMetadataMap);
+  }
+
+  // non-threadsafe
+  private StageNode walkRelPlan(RelNode node, String currentStageId) {
+    if (isExchangeNode(node)) {
+      // 1. exchangeNode always have only one input, get its input converted 
as a new stage root.
+      StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
+      RelDistribution.Type exchangeType = ((LogicalExchange) 
node).distribution.getType();
+
+      // 2. make an exchange sender and receiver node pair
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getStageId(), exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot, 
mailboxReceiver.getStageId(), exchangeType);
+
+      // 3. put the sender side as a completed stage.
+      _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
+
+      // 4. return the receiver (this is considered as a "virtual table scan" 
node for its parent.
+      return mailboxReceiver;
+    } else {
+      StageNode stageNode = RelToStageConverter.toStageNode(node, 
currentStageId);
+      List<RelNode> inputs = node.getInputs();
+      for (RelNode input : inputs) {
+        stageNode.addInput(walkRelPlan(input, currentStageId));
+      }
+      StageMetadata stageMetadata = 
_stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata());
+      stageMetadata.attach(stageNode);
+      return stageNode;
+    }
+  }
+
+  private boolean isExchangeNode(RelNode node) {
+    return (node instanceof LogicalExchange);
+  }
+
+  private String getNewStageId() {
+    return String.valueOf(_stageIdCounter++);
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
new file mode 100644
index 0000000000..71701df762
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class AbstractStageNode implements StageNode {
+
+  protected final String _stageId;
+  protected final List<StageNode> _inputs;
+
+  public AbstractStageNode(String stageId) {
+    _stageId = stageId;
+    _inputs = new ArrayList<>();
+  }
+
+  @Override
+  public List<StageNode> getInputs() {
+    return _inputs;
+  }
+
+  @Override
+  public void addInput(StageNode stageNode) {
+    _inputs.add(stageNode);
+  }
+
+  @Override
+  public String getStageId() {
+    return _stageId;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
new file mode 100644
index 0000000000..4b4ca9165e
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import org.apache.calcite.rel.logical.LogicalCalc;
+
+
+public class CalcNode extends AbstractStageNode {
+  private final String _expression;
+
+  public CalcNode(LogicalCalc node, String currentStageId) {
+    super(currentStageId);
+    _expression = toExpression(node);
+  }
+
+  public String getExpression() {
+    return _expression;
+  }
+
+  private String toExpression(LogicalCalc node) {
+    // TODO: make it real.
+    return node.getDigest();
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
new file mode 100644
index 0000000000..520af693d6
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+
+
+public class JoinNode extends AbstractStageNode {
+  private final JoinRelType _joinType;
+  private final int _leftOperandIndex;
+  private final int _rightOperandIndex;
+  private final FieldSelectionKeySelector _leftFieldSelectionKeySelector;
+  private final FieldSelectionKeySelector _rightFieldSelectionKeySelector;
+
+  private transient final RelDataType _leftRowType;
+  private transient final RelDataType _rightRowType;
+
+  public JoinNode(LogicalJoin node, String currentStageId) {
+    super(currentStageId);
+    _joinType = node.getJoinType();
+    RexCall joinCondition = (RexCall) node.getCondition();
+    Preconditions.checkState(
+        joinCondition.getOperator().getKind().equals(SqlKind.EQUALS) && 
joinCondition.getOperands().size() == 2,
+        "only equality JOIN is supported");
+    Preconditions.checkState(joinCondition.getOperands().get(0) instanceof 
RexInputRef, "only reference supported");
+    Preconditions.checkState(joinCondition.getOperands().get(1) instanceof 
RexInputRef, "only reference supported");
+    _leftRowType = node.getLeft().getRowType();
+    _rightRowType = node.getRight().getRowType();
+    _leftOperandIndex = ((RexInputRef) 
joinCondition.getOperands().get(0)).getIndex();
+    _rightOperandIndex = ((RexInputRef) 
joinCondition.getOperands().get(1)).getIndex();
+    _leftFieldSelectionKeySelector = new 
FieldSelectionKeySelector(_leftOperandIndex);
+    _rightFieldSelectionKeySelector =
+        new FieldSelectionKeySelector(_rightOperandIndex - 
_leftRowType.getFieldNames().size());
+  }
+
+  public JoinRelType getJoinType() {
+    return _joinType;
+  }
+
+  public RelDataType getLeftRowType() {
+    return _leftRowType;
+  }
+
+  public RelDataType getRightRowType() {
+    return _rightRowType;
+  }
+
+  public int getLeftOperandIndex() {
+    return _leftOperandIndex;
+  }
+
+  public int getRightOperandIndex() {
+    return _rightOperandIndex;
+  }
+
+  public FieldSelectionKeySelector getLeftJoinKeySelector() {
+    return _leftFieldSelectionKeySelector;
+  }
+
+  public FieldSelectionKeySelector getRightJoinKeySelector() {
+    return _rightFieldSelectionKeySelector;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
new file mode 100644
index 0000000000..947d449383
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+
+
+public class MailboxReceiveNode extends AbstractStageNode {
+
+  private final String _senderStageId;
+  private final RelDistribution.Type _exchangeType;
+
+  public MailboxReceiveNode(String stageId, String senderStageId, 
RelDistribution.Type exchangeType) {
+    super(stageId);
+    _senderStageId = senderStageId;
+    _exchangeType = exchangeType;
+  }
+
+  @Override
+  public List<StageNode> getInputs() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void addInput(StageNode stageNode) {
+    throw new UnsupportedOperationException("no input should be added to 
mailbox receive.");
+  }
+
+  public String getSenderStageId() {
+    return _senderStageId;
+  }
+
+  public RelDistribution.Type getExchangeType() {
+    return _exchangeType;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
new file mode 100644
index 0000000000..6db9578f35
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+
+
+public class MailboxSendNode extends AbstractStageNode {
+  private final StageNode _stageRoot;
+  private final String _receiverStageId;
+  private final RelDistribution.Type _exchangeType;
+
+  public MailboxSendNode(StageNode stageRoot, String receiverStageId, 
RelDistribution.Type exchangeType) {
+    super(stageRoot.getStageId());
+    _stageRoot = stageRoot;
+    _receiverStageId = receiverStageId;
+    _exchangeType = exchangeType;
+  }
+
+  @Override
+  public List<StageNode> getInputs() {
+    return Collections.singletonList(_stageRoot);
+  }
+
+  @Override
+  public void addInput(StageNode queryStageRoot) {
+    throw new UnsupportedOperationException("mailbox cannot be changed!");
+  }
+
+  public String getReceiverStageId() {
+    return _receiverStageId;
+  }
+
+  public RelDistribution.Type getExchangeType() {
+    return _exchangeType;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
new file mode 100644
index 0000000000..8fcbb5e01a
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/StageNode.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Stage Node is a serializable version of the {@link 
org.apache.calcite.rel.RelNode}.
+ *
+ * TODO: stage node currently uses java.io.Serializable as its serialization 
format.
+ * We should experiment with other type of serialization format for better 
performance.
+ * Essentially what we need is a way to exclude the planner context from the 
RelNode but only keeps the
+ * constructed relational content because we will no longer revisit the 
planner after stage is created.
+ */
+public interface StageNode extends Serializable {
+
+  List<StageNode> getInputs();
+
+  void addInput(StageNode stageNode);
+
+  String getStageId();
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
new file mode 100644
index 0000000000..0bb6e0f704
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+
+
+public class TableScanNode extends AbstractStageNode {
+  private final List<String> _tableName;
+  private final List<String> _tableScanColumns;
+
+  public TableScanNode(LogicalTableScan tableScan, String stageId) {
+    super(stageId);
+    _tableName = tableScan.getTable().getQualifiedName();
+    // TODO: optimize this, table field is not directly usable as name.
+    _tableScanColumns =
+        
tableScan.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
+  }
+
+  @Override
+  public List<StageNode> getInputs() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void addInput(StageNode queryStageRoot) {
+    throw new UnsupportedOperationException("TableScanNode cannot add input as 
it is a leaf node");
+  }
+
+  public List<String> getTableName() {
+    return _tableName;
+  }
+
+  public List<String> getTableScanColumns() {
+    return _tableScanColumns;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
new file mode 100644
index 0000000000..9e0c776fd5
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+import java.io.Serializable;
+
+
+/**
+ * The {@code FieldSelectionKeySelector} simply extract a column value out 
from a row array {@link Object[]}.
+ */
+public class FieldSelectionKeySelector implements KeySelector<Object[], 
Object>, Serializable {
+
+  private int _columnIndex;
+
+  public FieldSelectionKeySelector(int columnIndex) {
+    _columnIndex = columnIndex;
+  }
+
+  @Override
+  public Object getKey(Object[] input) {
+    return input[_columnIndex];
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
new file mode 100644
index 0000000000..79dc987d9d
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.partitioning;
+
+/**
+ * The {@code KeySelector} provides a partitioning function to encode a 
specific input data type into a key.
+ *
+ * <p>This key selector is used for computation such as GROUP BY or equality 
JOINs.
+ *
+ * <p>Key selector should always produce the same selection hash key when the 
same input is provided.
+ */
+public interface KeySelector<IN, OUT> {
+
+  /**
+   * Extract the key out of an input data construct.
+   *
+   * @param input input data.
+   * @return the key of the input data.
+   */
+  OUT getKey(IN input);
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
new file mode 100644
index 0000000000..7ad6ee0b9c
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * WorkerInstance is a wrapper around {@link ServerInstance}.
+ *
+ * <p>This can be considered as a simplified version which directly enable 
host-port initialization.
+ */
+public class WorkerInstance extends ServerInstance {
+
+  public WorkerInstance(InstanceConfig instanceConfig) {
+    super(instanceConfig);
+  }
+
+  public WorkerInstance(String hostname, int serverPort, int mailboxPort) {
+    super(toInstanceConfig(hostname, serverPort, mailboxPort));
+  }
+
+  private static InstanceConfig toInstanceConfig(String hostname, int 
serverPort, int mailboxPort) {
+    String server = String.format("%s_%d", hostname, serverPort);
+    InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server);
+    ZNRecord znRecord = instanceConfig.getRecord();
+    Map<String, String> simpleFields = znRecord.getSimpleFields();
+    simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, 
String.valueOf(mailboxPort));
+    return instanceConfig;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
new file mode 100644
index 0000000000..a3102580b6
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.routing;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+
+
+/**
+ * The {@code WorkerManager} manages stage to worker assignment.
+ *
+ * <p>It contains the logic to assign worker to a particular stages. If it is 
a leaf stage the logic fallback to
+ * how Pinot server assigned server and server-segment mapping.
+ *
+ * TODO: Currently it is implemented by wrapping routing manager from Pinot 
Broker. however we can abstract out
+ * the worker manager later when we split out the query-spi layer.
+ */
+public class WorkerManager {
+  private static final CalciteSqlCompiler CALCITE_SQL_COMPILER = new 
CalciteSqlCompiler();
+
+  private final String _hostName;
+  private final int _port;
+  private final RoutingManager _routingManager;
+
+  public WorkerManager(String hostName, int port, RoutingManager 
routingManager) {
+    _hostName = hostName;
+    _port = port;
+    _routingManager = routingManager;
+  }
+
+  public void assignWorkerToStage(String stageId, StageMetadata stageMetadata) 
{
+    List<String> scannedTables = stageMetadata.getScannedTables();
+    if (scannedTables.size() == 1) { // table scan stage, need to attach 
server as well as segment info.
+      RoutingTable routingTable = getRoutingTable(scannedTables.get(0));
+      Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = 
routingTable.getServerInstanceToSegmentsMap();
+      stageMetadata.setServerInstances(new 
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
+      stageMetadata.setServerInstanceToSegmentsMap(new 
HashMap<>(serverInstanceToSegmentsMap));
+    } else if (stageId.equalsIgnoreCase("ROOT")) {
+      // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results.
+      // here we simply assign the worker instance with identical 
server/mailbox port number.
+      stageMetadata.setServerInstances(Lists.newArrayList(new 
WorkerInstance(_hostName, _port, _port)));
+    } else {
+      
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
+    }
+  }
+
+  private static List<ServerInstance> filterServers(Collection<ServerInstance> 
servers) {
+    List<ServerInstance> serverInstances = new ArrayList<>();
+    for (ServerInstance server : servers) {
+      String hostname = server.getHostname();
+      if 
(!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && 
!hostname.startsWith(
+          CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) && 
!hostname.startsWith(
+          CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE) && 
server.getGrpcPort() > 0) {
+        serverInstances.add(server);
+      }
+    }
+    return serverInstances;
+  }
+
+  private RoutingTable getRoutingTable(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    // TODO: support both offline and realtime, now we hard code offline table.
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName);
+    return 
_routingManager.getRoutingTable(CALCITE_SQL_COMPILER.compileToBrokerRequest(
+        "SELECT * FROM " + tableNameWithType));
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..2b35613f7a
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, always insert exchange after JOIN
+ */
+public class PinotExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotExchangeNodeInsertRule INSTANCE =
+      new PinotExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+  public PinotExchangeNodeInsertRule(RelBuilderFactory factory) {
+    super(operand(LogicalJoin.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof Join) {
+      Join join = call.rel(0);
+      return !isExchange(join.getLeft()) && !isExchange(join.getRight());
+    }
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = call.rel(0);
+    RelNode leftInput = join.getInput(0);
+    RelNode rightInput = join.getInput(1);
+
+    RelNode leftExchange = LogicalExchange.create(leftInput, 
RelDistributions.SINGLETON);
+    RelNode rightExchange = LogicalExchange.create(rightInput, 
RelDistributions.BROADCAST_DISTRIBUTED);
+
+    RelNode newJoinNode =
+        new LogicalJoin(join.getCluster(), join.getTraitSet(), leftExchange, 
rightExchange, join.getCondition(),
+            join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
+            ImmutableList.copyOf(join.getSystemFieldList()));
+
+    call.transformTo(newJoinNode);
+  }
+
+  private static boolean isExchange(RelNode rel) {
+    RelNode reference = rel;
+    if (reference instanceof HepRelVertex) {
+      reference = ((HepRelVertex) reference).getCurrentRel();
+    }
+    return reference instanceof Exchange;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
new file mode 100644
index 0000000000..1b4e0850ac
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.rules;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.PruneEmptyRules;
+
+
+/**
+ * Default rule sets for Pinot query
+ */
+public class PinotQueryRuleSets {
+  private PinotQueryRuleSets() {
+    // do not instantiate.
+  }
+
+  public static final Collection<RelOptRule> LOGICAL_OPT_RULES =
+      Arrays.asList(EnumerableRules.ENUMERABLE_FILTER_RULE, 
EnumerableRules.ENUMERABLE_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_PROJECT_RULE, 
EnumerableRules.ENUMERABLE_SORT_RULE,
+          EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
+
+          // push a filter into a join
+          CoreRules.FILTER_INTO_JOIN,
+          // push filter through an aggregation
+          CoreRules.FILTER_AGGREGATE_TRANSPOSE,
+          // push filter through set operation
+          CoreRules.FILTER_SET_OP_TRANSPOSE,
+          // push project through set operation
+          CoreRules.PROJECT_SET_OP_TRANSPOSE,
+
+          // aggregation and projection rules
+          CoreRules.AGGREGATE_PROJECT_MERGE, 
CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS,
+          // push a projection past a filter or vice versa
+          CoreRules.PROJECT_FILTER_TRANSPOSE, 
CoreRules.FILTER_PROJECT_TRANSPOSE,
+          // push a projection to the children of a join
+          // push all expressions to handle the time indicator correctly
+          CoreRules.JOIN_CONDITION_PUSH,
+          // merge projections
+          CoreRules.PROJECT_MERGE,
+          // remove identity project
+          CoreRules.PROJECT_REMOVE,
+          // reorder sort and projection
+          CoreRules.SORT_PROJECT_TRANSPOSE,
+
+          // join rules
+          CoreRules.JOIN_PUSH_EXPRESSIONS,
+
+          // convert non-all union into all-union + distinct
+          CoreRules.UNION_TO_DISTINCT,
+
+          // remove aggregation if it does not aggregate and input is already 
distinct
+          CoreRules.AGGREGATE_REMOVE,
+          // push aggregate through join
+          CoreRules.AGGREGATE_JOIN_TRANSPOSE,
+          // aggregate union rule
+          CoreRules.AGGREGATE_UNION_AGGREGATE,
+
+          // reduce aggregate functions like AVG, STDDEV_POP etc.
+          CoreRules.AGGREGATE_REDUCE_FUNCTIONS,
+
+          // remove unnecessary sort rule
+          CoreRules.SORT_REMOVE,
+
+          // prune empty results rules
+          PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, 
PruneEmptyRules.JOIN_LEFT_INSTANCE,
+          PruneEmptyRules.JOIN_RIGHT_INSTANCE, 
PruneEmptyRules.PROJECT_INSTANCE, PruneEmptyRules.SORT_INSTANCE,
+          PruneEmptyRules.UNION_INSTANCE,
+
+          // Pinot specific rules
+          PinotExchangeNodeInsertRule.INSTANCE);
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
new file mode 100644
index 0000000000..55797b0e52
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.type;
+
+import java.util.Map;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Extends Java-base TypeFactory from Calcite.
+ *
+ * <p>{@link JavaTypeFactoryImpl} is used here because we are not overriding 
much of the TypeFactory methods
+ * required by Calcite. We will start extending {@link SqlTypeFactoryImpl} or 
even {@link RelDataTypeFactory}
+ * when necessary for Pinot to override such mechanism.
+ *
+ * <p>Noted that {@link JavaTypeFactoryImpl} is subject to change. Please pay 
extra attention to this class when
+ * upgrading Calcite versions.
+ */
+public class TypeFactory extends JavaTypeFactoryImpl {
+  private final RelDataTypeSystem _typeSystem;
+
+  public TypeFactory(RelDataTypeSystem typeSystem) {
+    _typeSystem = typeSystem;
+  }
+
+  public RelDataType createRelDataTypeFromSchema(Schema schema) {
+    Builder builder = new Builder(this);
+    for (Map.Entry<String, FieldSpec> e : schema.getFieldSpecMap().entrySet()) 
{
+      builder.add(e.getKey(), toRelDataType(e.getValue()));
+    }
+    return builder.build();
+  }
+
+  private RelDataType toRelDataType(FieldSpec fieldSpec) {
+    switch (fieldSpec.getDataType()) {
+      case INT:
+        return createSqlType(SqlTypeName.INTEGER);
+      case LONG:
+        return createSqlType(SqlTypeName.BIGINT);
+      case FLOAT:
+        return createSqlType(SqlTypeName.FLOAT);
+      case DOUBLE:
+        return createSqlType(SqlTypeName.DOUBLE);
+      case BOOLEAN:
+        return createSqlType(SqlTypeName.BOOLEAN);
+      case TIMESTAMP:
+        return createSqlType(SqlTypeName.TIMESTAMP);
+      case STRING:
+        return createSqlType(SqlTypeName.VARCHAR);
+      case BYTES:
+        return createSqlType(SqlTypeName.VARBINARY);
+      case JSON:
+        // TODO: support JSON, JSON should be supported using a special 
RelDataType as it is not a simple String,
+        // nor can it be easily parsed as a STRUCT.
+      case LIST:
+        // TODO: support LIST, MV column should go fall into this category.
+      case STRUCT:
+      case MAP:
+      default:
+        throw new UnsupportedOperationException("unsupported!");
+    }
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
new file mode 100644
index 0000000000..b2606d9296
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.type;
+
+import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
+
+
+/**
+ * The {@code TypeSystem} overwrites Calcite type system with Pinot specific 
logics.
+ *
+ * TODO: no overwrite for now.
+ */
+public class TypeSystem extends RelDataTypeSystemImpl {
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
new file mode 100644
index 0000000000..f774976c21
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.validate;
+
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
+
+
+/**
+ * The {@code Validator} overwrites Calcite's Validator with Pinot specific 
logics.
+ */
+public class Validator extends SqlValidatorImpl {
+
+  public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader 
catalogReader, RelDataTypeFactory typeFactory) {
+    // TODO: support BABEL validator. Currently parser conformance is set to 
use BABEL.
+    super(opTab, catalogReader, typeFactory, Config.DEFAULT);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
new file mode 100644
index 0000000000..911469644b
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableList;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.externalize.RelXmlWriter;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.catalog.PinotCatalog;
+import org.apache.pinot.query.context.PlannerContext;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class QueryEnvironmentTest {
+  private QueryEnvironment _queryEnvironment;
+
+  @BeforeClass
+  public void setUp() {
+    // the port doesn't matter as we are not actually making a server call.
+    RoutingManager routingManager = 
QueryEnvironmentTestUtils.getMockRoutingManager(1, 2);
+    _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
+        CalciteSchemaBuilder.asRootSchema(new 
PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
+        new WorkerManager("localhost", 3, routingManager));
+  }
+
+  @Test
+  public void testSqlStrings()
+      throws Exception {
+    testQueryParsing("SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 
>= 0",
+        "SELECT *\n" + "FROM `a`\n" + "INNER JOIN `b` ON `a`.`col1` = 
`b`.`col2`\n" + "WHERE `a`.`col3` >= 0");
+  }
+
+  @Test
+  public void testQueryToStages()
+      throws Exception {
+    PlannerContext plannerContext = new PlannerContext();
+    String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2";
+    QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+    Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4);
+    Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4);
+    for (Map.Entry<String, StageMetadata> e : 
queryPlan.getStageMetadataMap().entrySet()) {
+      List<String> tables = e.getValue().getScannedTables();
+      if (tables.size() == 1) {
+        // table scan stages; for tableA it should have 2 hosts, for tableB it 
should have only 1
+        Assert.assertEquals(
+            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+            tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1", 
"Server_localhost_2")
+                : ImmutableList.of("Server_localhost_1"));
+      } else if (!e.getKey().equals("ROOT")) {
+        // join stage should have both servers used.
+        Assert.assertEquals(
+            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+            ImmutableList.of("Server_localhost_1", "Server_localhost_2"));
+      } else {
+        // reduce stage should have the reducer instance.
+        Assert.assertEquals(
+            
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
+            ImmutableList.of("Server_localhost_3"));
+      }
+    }
+  }
+
+  @Test
+  public void testQueryToRel()
+      throws Exception {
+    PlannerContext plannerContext = new PlannerContext();
+    String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 
0";
+    SqlNode parsed = _queryEnvironment.parse(query, plannerContext);
+    SqlNode validated = _queryEnvironment.validate(parsed);
+    RelRoot relRoot = _queryEnvironment.toRelation(validated, plannerContext);
+    RelNode optimized = _queryEnvironment.optimize(relRoot, plannerContext);
+
+    // Assert that relational plan can be written into a ALL-ATTRIBUTE digest.
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    RelWriter planWriter = new RelXmlWriter(pw, 
SqlExplainLevel.ALL_ATTRIBUTES);
+    optimized.explain(planWriter);
+    Assert.assertNotNull(sw.toString());
+  }
+
+  private void testQueryParsing(String query, String digest)
+      throws Exception {
+    PlannerContext plannerContext = new PlannerContext();
+    SqlNode sqlNode = _queryEnvironment.parse(query, plannerContext);
+    _queryEnvironment.validate(sqlNode);
+    Assert.assertEquals(sqlNode.toString(), digest);
+  }
+}
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
new file mode 100644
index 0000000000..cc0db385db
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.catalog.PinotCatalog;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.routing.WorkerInstance;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Base query environment test that provides a bunch of mock tables / schemas 
so that
+ * we can run a simple query planning, produce stages / metadata for other 
components to test.
+ */
+public class QueryEnvironmentTestUtils {
+  public static final Schema.SchemaBuilder SCHEMA_BUILDER;
+  public static final Map<String, List<String>> SERVER1_SEGMENTS =
+      ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b", 
Lists.newArrayList("b1"), "c",
+          Lists.newArrayList("c1"));
+  public static final Map<String, List<String>> SERVER2_SEGMENTS =
+      ImmutableMap.of("a", Lists.newArrayList("a3"), "c", 
Lists.newArrayList("c2", "c3"));
+
+  static {
+    SCHEMA_BUILDER = new 
Schema.SchemaBuilder().addSingleValueDimension("col1", 
FieldSpec.DataType.STRING, "")
+        .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
+        .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS")
+        .addMetric("col3", FieldSpec.DataType.INT, 0);
+  }
+
+  private QueryEnvironmentTestUtils() {
+    // do not instantiate.
+  }
+
+  public static TableCache mockTableCache() {
+    TableCache mock = mock(TableCache.class);
+    when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a", "a", "b", 
"b", "c", "c"));
+    
when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build());
+    
when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build());
+    
when(mock.getSchema("c")).thenReturn(SCHEMA_BUILDER.setSchemaName("c").build());
+    return mock;
+  }
+
+  public static QueryEnvironment getQueryEnvironment(int reducerPort, int 
port1, int port2) {
+    RoutingManager routingManager = 
QueryEnvironmentTestUtils.getMockRoutingManager(port1, port2);
+    return new QueryEnvironment(new TypeFactory(new TypeSystem()),
+        CalciteSchemaBuilder.asRootSchema(new 
PinotCatalog(QueryEnvironmentTestUtils.mockTableCache())),
+        new WorkerManager("localhost", reducerPort, routingManager));
+  }
+
+  public static RoutingManager getMockRoutingManager(int port1, int port2) {
+    String server1 = String.format("localhost_%d", port1);
+    String server2 = String.format("localhost_%d", port2);
+    // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
+    // this is only use for test identifier purpose.
+    ServerInstance host1 = new WorkerInstance("localhost", port1, port1);
+    ServerInstance host2 = new WorkerInstance("localhost", port2, port2);
+
+    RoutingTable rtA = mock(RoutingTable.class);
+    when(rtA.getServerInstanceToSegmentsMap()).thenReturn(
+        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("a"), host2, 
SERVER2_SEGMENTS.get("a")));
+    RoutingTable rtB = mock(RoutingTable.class);
+    
when(rtB.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1, 
SERVER1_SEGMENTS.get("b")));
+    RoutingTable rtC = mock(RoutingTable.class);
+    when(rtC.getServerInstanceToSegmentsMap()).thenReturn(
+        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("c"), host2, 
SERVER2_SEGMENTS.get("c")));
+    Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA, 
"b", rtB, "c", rtC);
+    RoutingManager mock = mock(RoutingManager.class);
+    when(mock.getRoutingTable(any())).thenAnswer(invocation -> {
+      BrokerRequest brokerRequest = invocation.getArgument(0);
+      String tableName = 
brokerRequest.getPinotQuery().getDataSource().getTableName();
+      return 
mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName));
+    });
+    
when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1, 
host1, server2, host2));
+    return mock;
+  }
+
+  public static String getTestStageByServerCount(QueryPlan queryPlan, int 
serverCount) {
+    List<String> stageIds = queryPlan.getStageMetadataMap().entrySet().stream()
+        .filter(e -> !e.getKey().equals("ROOT") && 
e.getValue().getServerInstances().size() == serverCount)
+        .map(Map.Entry::getKey).collect(Collectors.toList());
+    return stageIds.size() > 0 ? stageIds.get(0) : null;
+  }
+
+  public static int getAvailablePort() {
+    try {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        return socket.getLocalPort();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to find an available port to use", e);
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 8fa7ab76c1..56e6ade7e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,8 @@
     <module>pinot-segment-local</module>
     <module>pinot-compatibility-verifier</module>
     <module>contrib/pinot-fmpp-maven-plugin</module>
+
+    <module>pinot-query-planner</module>
   </modules>
 
   <licenses>
@@ -966,10 +968,6 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-dbcp2</artifactId>
           </exclusion>
-          <exclusion>
-            <groupId>com.esri.geometry</groupId>
-            <artifactId>esri-geometry-api</artifactId>
-          </exclusion>
           <exclusion>
             <groupId>com.fasterxml.jackson.dataformat</groupId>
             <artifactId>jackson-dataformat-yaml</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to