This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/multi_stage_query_engine by 
this push:
     new 82623d8f71 Merge Multistage Engine back to main (#8720)
82623d8f71 is described below

commit 82623d8f71683045937a446c85e97edb994b9612
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Jun 3 08:56:46 2022 -1000

    Merge Multistage Engine back to main (#8720)
    
    * add e2e quickstart;
    
    all commits should squash into this until PR created
    
    list of work items:
    - modified the entrypoint in server/broker to run dispatcher and 
query-worker
    - modified the pinot-tools configs to start it properly
    
    * more merge back contents
    
    1. added broker request handler delegate, that redirects to one of the 3 
handlers
    2. making multi-stage enigne service port and mailbox port separate Helix 
ZNode field
    3. fix several bugs
    
    * revert changes to conf from quickstart demo. and fix tests
    
    * address diff comments
    
    * address diff comments
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 pinot-broker/pom.xml                               |   8 +
 .../broker/broker/helix/BaseBrokerStarter.java     |  30 ++-
 .../requesthandler/BaseBrokerRequestHandler.java   |   2 +-
 .../BrokerRequestHandlerDelegate.java              |  84 +++++++++
 .../requesthandler/GrpcBrokerRequestHandler.java   |   4 +
 .../MultiStageBrokerRequestHandler.java            | 206 +++++++++++++++++++++
 .../SingleConnectionBrokerRequestHandler.java      |   1 +
 .../pinot/common/utils/config/InstanceUtils.java   |  12 ++
 .../common/utils/config/InstanceUtilsTest.java     |  26 ++-
 .../resources/PinotInstanceRestletResource.java    |  28 +++
 .../api/PinotInstanceRestletResourceTest.java      |  15 +-
 .../PinotHelixResourceManagerStatelessTest.java    |   5 +-
 .../helix/core/PinotHelixResourceManagerTest.java  |   2 +-
 .../pinot/core/transport/ServerInstance.java       |  17 ++
 .../apache/pinot/query/routing/WorkerInstance.java |  15 +-
 .../apache/pinot/query/routing/WorkerManager.java  |   9 +-
 .../pinot/query/QueryEnvironmentTestUtils.java     |   4 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   2 +-
 .../runtime/operator/MailboxSendOperator.java      |   2 +-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    |   7 +-
 .../pinot/query/service/QueryDispatcher.java       |  19 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   6 +-
 .../pinot/query/service/QueryServerTest.java       |   3 +-
 pinot-server/pom.xml                               |   8 +
 .../org/apache/pinot/server/conf/ServerConf.java   |   5 +
 .../pinot/server/starter/ServerInstance.java       |  65 ++++---
 .../pinot/server/worker/WorkerQueryServer.java     | 102 ++++++++++
 .../apache/pinot/spi/config/instance/Instance.java |  23 +++
 .../apache/pinot/spi/utils/CommonConstants.java    |  12 ++
 29 files changed, 656 insertions(+), 66 deletions(-)

diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index d354cc7c28..f9bc5da067 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -57,6 +57,14 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-planner</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-runtime</artifactId>
+    </dependency>
 
     <!-- Jersey & Swagger -->
     <dependency>
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 45e918875e..351ce1ea7c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -41,7 +41,9 @@ import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
 import 
org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
 import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
 import 
org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.Utils;
@@ -251,24 +253,38 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, 
Broker.BROKER_TLS_PREFIX);
     NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, 
Broker.BROKER_NETTY_PREFIX);
 
-    if (_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, 
Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE)
-        .equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
-      LOGGER.info("Starting Grpc BrokerRequestHandler.");
-      _brokerRequestHandler =
+    // Create Broker request handler.
+    String brokerRequestHandlerType =
+        _brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, 
Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
+    BrokerRequestHandler singleStageBrokerRequestHandler = null;
+    if 
(brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE))
 {
+      singleStageBrokerRequestHandler =
           new GrpcBrokerRequestHandler(_brokerConf, _routingManager, 
_accessControlFactory, queryQuotaManager,
               tableCache, _brokerMetrics, null);
     } else { // default request handler type, e.g. netty
-      LOGGER.info("Starting Netty BrokerRequestHandler.");
       if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
-        _brokerRequestHandler =
+        singleStageBrokerRequestHandler =
             new SingleConnectionBrokerRequestHandler(_brokerConf, 
_routingManager, _accessControlFactory,
                 queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
tlsDefaults);
       } else {
-        _brokerRequestHandler =
+        singleStageBrokerRequestHandler =
             new SingleConnectionBrokerRequestHandler(_brokerConf, 
_routingManager, _accessControlFactory,
                 queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, 
null);
       }
     }
+
+    MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
+    if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, 
Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
+      // multi-stage request handler uses both Netty and GRPC ports.
+      // worker requires both the "Netty port" for protocol transport; and 
"GRPC port" for mailbox transport.
+      // TODO: decouple protocol and engine selection.
+      multiStageBrokerRequestHandler =
+          new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, 
_accessControlFactory, queryQuotaManager,
+              tableCache, _brokerMetrics);
+    }
+
+    _brokerRequestHandler = new 
BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
+        multiStageBrokerRequestHandler);
     _brokerRequestHandler.start();
     String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
     if (controllerUrl != null) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bdb647570a..1d2dd7ead8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -1612,7 +1612,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       RequestContext requestContext)
       throws Exception;
 
-  private static void augmentStatistics(RequestContext statistics, 
BrokerResponse response) {
+  protected static void augmentStatistics(RequestContext statistics, 
BrokerResponse response) {
     statistics.setTotalDocs(response.getTotalDocs());
     statistics.setNumDocsScanned(response.getNumDocsScanned());
     
statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
new file mode 100644
index 0000000000..a39ad1d265
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import javax.annotation.Nullable;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * {@code BrokerRequestHandlerDelegate} delegates the inbound broker request 
to one of the enabled
+ * {@link BrokerRequestHandler} based on the requested handle type.
+ *
+ * {@see: @CommonConstant
+ */
+public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
+
+  private final BrokerRequestHandler _singleStageBrokerRequestHandler;
+  private final MultiStageBrokerRequestHandler _multiStageWorkerRequestHandler;
+
+  private final boolean _isMultiStageQueryEngineEnabled;
+
+
+  public BrokerRequestHandlerDelegate(
+      BrokerRequestHandler singleStageBrokerRequestHandler,
+      @Nullable MultiStageBrokerRequestHandler multiStageWorkerRequestHandler
+  ) {
+    _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
+    _multiStageWorkerRequestHandler = multiStageWorkerRequestHandler;
+    _isMultiStageQueryEngineEnabled = _multiStageWorkerRequestHandler != null;
+  }
+
+  @Override
+  public void start() {
+    if (_singleStageBrokerRequestHandler != null) {
+      _singleStageBrokerRequestHandler.start();
+    }
+    if (_multiStageWorkerRequestHandler != null) {
+      _multiStageWorkerRequestHandler.start();
+    }
+  }
+
+  @Override
+  public void shutDown() {
+    if (_singleStageBrokerRequestHandler != null) {
+      _singleStageBrokerRequestHandler.shutDown();
+    }
+    if (_multiStageWorkerRequestHandler != null) {
+      _multiStageWorkerRequestHandler.shutDown();
+    }
+  }
+
+  @Override
+  public BrokerResponse handleRequest(JsonNode request, @Nullable 
RequesterIdentity requesterIdentity,
+      RequestContext requestContext)
+      throws Exception {
+    if (_isMultiStageQueryEngineEnabled && _multiStageWorkerRequestHandler != 
null) {
+      JsonNode node = 
request.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE);
+      if (node != null && node.asBoolean()) {
+        return _multiStageWorkerRequestHandler.handleRequest(request, 
requesterIdentity, requestContext);
+      }
+    }
+    return _singleStageBrokerRequestHandler.handleRequest(request, 
requesterIdentity, requestContext);
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 468add98c2..0bd33a9289 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -42,6 +42,8 @@ import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.RequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -49,6 +51,7 @@ import org.apache.pinot.spi.trace.RequestContext;
  */
 @ThreadSafe
 public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
 
   private final GrpcQueryClient.Config _grpcConfig;
   private final StreamingReduceService _streamingReduceService;
@@ -59,6 +62,7 @@ public class GrpcBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
     super(config, routingManager, accessControlFactory, queryQuotaManager, 
tableCache, brokerMetrics);
+    LOGGER.info("Using Grpc BrokerRequestHandler.");
     _grpcConfig = buildGrpcQueryClientConfig(config);
 
     // create streaming query client
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
new file mode 100644
index 0000000000..e992031288
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.QueryEnvironment;
+import org.apache.pinot.query.catalog.PinotCatalog;
+import org.apache.pinot.query.mailbox.GrpcMailboxService;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.service.QueryDispatcher;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);
+  private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L;
+  private final String _reducerHostname;
+  private final int _reducerPort;
+
+  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private QueryEnvironment _queryEnvironment;
+  private QueryDispatcher _queryDispatcher;
+
+  public MultiStageBrokerRequestHandler(PinotConfiguration config, 
BrokerRoutingManager routingManager,
+      AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
+      BrokerMetrics brokerMetrics) {
+    super(config, routingManager, accessControlFactory, queryQuotaManager, 
tableCache, brokerMetrics);
+    LOGGER.info("Using Multi-stage BrokerRequestHandler.");
+    String reducerHostname = 
config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
+    if (reducerHostname == null) {
+      // use broker ID as host name, but remove the
+      String brokerId = 
config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
+      brokerId = 
brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? 
brokerId.substring(
+          CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId;
+      brokerId = StringUtils.split(brokerId, "_").length > 1 ? 
StringUtils.split(brokerId, "_")[0] : brokerId;
+      reducerHostname = brokerId;
+    }
+    _reducerHostname = reducerHostname;
+    _reducerPort = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+    _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
+        CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
+        new WorkerManager(_reducerHostname, _reducerPort, routingManager));
+    _queryDispatcher = new QueryDispatcher();
+    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort);
+
+    // TODO: move this to a startUp() function.
+    _mailboxService.start();
+  }
+
+  @Override
+  public BrokerResponseNative handleRequest(JsonNode request, @Nullable 
RequesterIdentity requesterIdentity,
+      RequestContext requestContext)
+      throws Exception {
+    long requestId = _requestIdGenerator.incrementAndGet();
+    requestContext.setBrokerId(_brokerId);
+    requestContext.setRequestId(requestId);
+    requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
+
+    // First-stage access control to prevent unauthenticated requests from 
using up resources. Secondary table-level
+    // check comes later.
+    boolean hasAccess = 
_accessControlFactory.create().hasAccess(requesterIdentity);
+    if (!hasAccess) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
 1);
+      LOGGER.info("Access denied for requestId {}", requestId);
+      requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+      return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
+    }
+
+    JsonNode sql = request.get(CommonConstants.Broker.Request.SQL);
+    if (sql == null) {
+      throw new BadQueryRequestException("Failed to find 'sql' in the request: 
" + request);
+    } else {
+      return handleSQLRequest(requestId, 
request.get(CommonConstants.Broker.Request.SQL).asText(), request,
+          requesterIdentity, requestContext);
+    }
+  }
+
+  private BrokerResponseNative handleSQLRequest(long requestId, String query, 
JsonNode request,
+      @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
+      throws Exception {
+    LOGGER.debug("SQL query for request {}: {}", requestId, query);
+    requestContext.setQuery(query);
+
+    // Compile the request
+    long compilationStartTimeNs = System.nanoTime();
+    QueryPlan queryPlan;
+    try {
+      queryPlan = _queryEnvironment.planQuery(query);
+    } catch (Exception e) {
+      LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
+      requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
+    }
+
+    List<DataTable> queryResults = null;
+    try {
+      queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, 
_mailboxService, DEFAULT_TIMEOUT_NANO);
+    } catch (Exception e) {
+      LOGGER.info("query submission failed", e);
+      return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+    }
+
+    BrokerResponseNative brokerResponse = new BrokerResponseNative();
+    long executionEndTimeNs = System.nanoTime();
+
+    // Set total query processing time
+    long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - 
compilationStartTimeNs);
+    brokerResponse.setTimeUsedMs(totalTimeMs);
+    brokerResponse.setResultTable(toResultTable(queryResults));
+    requestContext.setQueryProcessingTime(totalTimeMs);
+    augmentStatistics(requestContext, brokerResponse);
+    return brokerResponse;
+  }
+
+  @Override
+  protected BrokerResponseNative processBrokerRequest(long requestId, 
BrokerRequest originalBrokerRequest,
+      BrokerRequest serverBrokerRequest, @Nullable BrokerRequest 
offlineBrokerRequest,
+      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, 
@Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long 
timeoutMs, ServerStats serverStats,
+      RequestContext requestContext)
+      throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  private ResultTable toResultTable(List<DataTable> queryResult) {
+    DataSchema resultDataSchema = null;
+    List<Object[]> resultRows = new ArrayList<>();
+    for (DataTable dataTable : queryResult) {
+      resultDataSchema = resultDataSchema == null ? dataTable.getDataSchema() 
: resultDataSchema;
+      int numColumns = resultDataSchema.getColumnNames().length;
+      DataSchema.ColumnDataType[] resultColumnDataTypes = 
resultDataSchema.getColumnDataTypes();
+      List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows());
+      for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
+        Object[] row = new Object[numColumns];
+        Object[] rawRow = 
SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+        for (int i = 0; i < numColumns; i++) {
+          row[i] = resultColumnDataTypes[i].convertAndFormat(rawRow[i]);
+        }
+        rows.add(row);
+      }
+      resultRows.addAll(rows);
+    }
+    return new ResultTable(resultDataSchema, resultRows);
+  }
+
+  @Override
+  public void start() {
+    // no-op
+  }
+
+  @Override
+  public void shutDown() {
+    _queryDispatcher.shutdown();
+    _mailboxService.shutdown();
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 2e399211cb..97a4b11e3f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -72,6 +72,7 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig 
tlsConfig) {
     super(config, routingManager, accessControlFactory, queryQuotaManager, 
tableCache, brokerMetrics);
+    LOGGER.info("Using Netty BrokerRequestHandler.");
 
     _brokerReduceService = new BrokerReduceService(_config);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics, nettyConfig, 
tlsConfig);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
index 6977638ced..b18046381f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
@@ -91,6 +91,18 @@ public class InstanceUtils {
     } else {
       simpleFields.remove(Helix.Instance.ADMIN_PORT_KEY);
     }
+    int queryServicePort = instance.getQueryServicePort();
+    if (queryServicePort > 0) {
+      
simpleFields.put(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, 
Integer.toString(queryServicePort));
+    } else {
+      
simpleFields.remove(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY);
+    }
+    int queryMailboxPort = instance.getQueryMailboxPort();
+    if (queryMailboxPort > 0) {
+      
simpleFields.put(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, 
Integer.toString(queryMailboxPort));
+    } else {
+      
simpleFields.remove(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY);
+    }
     boolean queriesDisabled = instance.isQueriesDisabled();
     if (queriesDisabled) {
       simpleFields.put(Helix.QUERIES_DISABLED, Boolean.toString(true));
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java
index 09cc1821ae..b687c74cbf 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java
@@ -39,7 +39,7 @@ public class InstanceUtilsTest {
 
   @Test
   public void testToHelixInstanceConfig() {
-    Instance instance = new Instance("localhost", 1234, 
InstanceType.CONTROLLER, null, null, 0, 0, false);
+    Instance instance = new Instance("localhost", 1234, 
InstanceType.CONTROLLER, null, null, 0, 0, 0, 0, false);
     InstanceConfig instanceConfig = 
InstanceUtils.toHelixInstanceConfig(instance);
     assertEquals(instanceConfig.getInstanceName(), 
"Controller_localhost_1234");
     assertTrue(instanceConfig.getInstanceEnabled());
@@ -50,10 +50,12 @@ public class InstanceUtilsTest {
     assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED));
 
     List<String> tags = Collections.singletonList("DefaultTenant_BROKER");
-    instance = new Instance("localhost", 2345, InstanceType.BROKER, tags, 
null, 0, 0, false);
+    instance = new Instance("localhost", 2345, InstanceType.BROKER, tags, 
null, 0, 0, 0, 0, false);
     instanceConfig = InstanceUtils.toHelixInstanceConfig(instance);
     assertEquals(instanceConfig.getInstanceName(), "Broker_localhost_2345");
     assertTrue(instanceConfig.getInstanceEnabled());
@@ -64,13 +66,15 @@ public class InstanceUtilsTest {
     assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED));
 
     tags = Arrays.asList("T1_OFFLINE", "T2_REALTIME");
     Map<String, Integer> poolMap = new TreeMap<>();
     poolMap.put("T1_OFFLINE", 0);
     poolMap.put("T2_REALTIME", 1);
-    instance = new Instance("localhost", 3456, InstanceType.SERVER, tags, 
poolMap, 123, 234, true);
+    instance = new Instance("localhost", 3456, InstanceType.SERVER, tags, 
poolMap, 123, 234, 345, 456, true);
     instanceConfig = InstanceUtils.toHelixInstanceConfig(instance);
     assertEquals(instanceConfig.getInstanceName(), "Server_localhost_3456");
     assertTrue(instanceConfig.getInstanceEnabled());
@@ -84,10 +88,14 @@ public class InstanceUtilsTest {
     assertEquals(znRecord.getMapField(InstanceUtils.POOL_KEY), 
expectedPoolMap);
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY),
 "123");
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY),
 "234");
+    
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY),
+        "345");
+    
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY),
+        "456");
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED), 
"true");
 
     tags = Collections.singletonList("minion_untagged");
-    instance = new Instance("localhost", 4567, InstanceType.MINION, tags, 
null, 0, 0, false);
+    instance = new Instance("localhost", 4567, InstanceType.MINION, tags, 
null, 0, 0, 0, 0, false);
     instanceConfig = InstanceUtils.toHelixInstanceConfig(instance);
     assertEquals(instanceConfig.getInstanceName(), "Minion_localhost_4567");
     assertTrue(instanceConfig.getInstanceEnabled());
@@ -98,6 +106,8 @@ public class InstanceUtilsTest {
     assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY));
+    
assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY));
     
assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED));
   }
 
@@ -105,7 +115,7 @@ public class InstanceUtilsTest {
   public void testUpdateHelixInstanceConfig() {
     Instance instance =
         new Instance("localhost", 1234, InstanceType.SERVER, 
Collections.singletonList("DefaultTenant_OFFLINE"), null,
-            0, 123, false);
+            0, 123, 234, 345, false);
     InstanceConfig instanceConfig = 
InstanceUtils.toHelixInstanceConfig(instance);
 
     // Put some custom fields, which should not be updated
@@ -120,7 +130,7 @@ public class InstanceUtilsTest {
     Map<String, Integer> poolMap = new TreeMap<>();
     poolMap.put("T1_OFFLINE", 0);
     poolMap.put("T2_REALTIME", 1);
-    instance = new Instance("myHost", 2345, InstanceType.SERVER, tags, 
poolMap, 123, 234, true);
+    instance = new Instance("myHost", 2345, InstanceType.SERVER, tags, 
poolMap, 123, 234, 345, 456, true);
     InstanceUtils.updateHelixInstanceConfig(instanceConfig, instance);
 
     // Instance name should not change
@@ -136,6 +146,10 @@ public class InstanceUtilsTest {
     assertEquals(znRecord.getMapField(InstanceUtils.POOL_KEY), 
expectedPoolMap);
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY),
 "123");
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY),
 "234");
+    
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY),
+        "345");
+    
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY),
+        "456");
     
assertEquals(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED), 
"true");
 
     // Custom fields should be preserved
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index 6b4b10b933..87b3659bb5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -113,6 +113,8 @@ public class PinotInstanceRestletResource {
     response.set("pools", 
JsonUtils.objectToJsonNode(instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY)));
     response.put("grpcPort", getGrpcPort(instanceConfig));
     response.put("adminPort", getAdminPort(instanceConfig));
+    response.put("queryServicePort", getQueryServicePort(instanceConfig));
+    response.put("queryMailboxPort", getQueryMailboxPort(instanceConfig));
     String queriesDisabled = 
instanceConfig.getRecord().getSimpleField(CommonConstants.Helix.QUERIES_DISABLED);
     if ("true".equalsIgnoreCase(queriesDisabled)) {
       response.put(CommonConstants.Helix.QUERIES_DISABLED, "true");
@@ -145,6 +147,32 @@ public class PinotInstanceRestletResource {
     return Instance.NOT_SET_ADMIN_PORT_VALUE;
   }
 
+  private int getQueryServicePort(InstanceConfig instanceConfig) {
+    String queryServicePortStr = instanceConfig.getRecord().getSimpleField(
+        
CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY);
+    if (queryServicePortStr != null) {
+      try {
+        return Integer.parseInt(queryServicePortStr);
+      } catch (Exception e) {
+        LOGGER.warn("Illegal service port: {} for instance: {}", 
queryServicePortStr, instanceConfig.getInstanceName());
+      }
+    }
+    return Instance.NOT_SET_GRPC_PORT_VALUE;
+  }
+
+  private int getQueryMailboxPort(InstanceConfig instanceConfig) {
+    String queryMailboxPortStr = instanceConfig.getRecord().getSimpleField(
+        
CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY);
+    if (queryMailboxPortStr != null) {
+      try {
+        return Integer.parseInt(queryMailboxPortStr);
+      } catch (Exception e) {
+        LOGGER.warn("Illegal mailbox port: {} for instance: {}", 
queryMailboxPortStr, instanceConfig.getInstanceName());
+      }
+    }
+    return Instance.NOT_SET_GRPC_PORT_VALUE;
+  }
+
   private Map<String, String> getSystemResourceInfo(InstanceConfig 
instanceConfig) {
     return 
instanceConfig.getRecord().getMapField(CommonConstants.Helix.Instance.SYSTEM_RESOURCE_INFO_KEY);
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index fd37cd59d7..13ce05ead5 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -84,10 +84,11 @@ public class PinotInstanceRestletResourceTest {
 
     // Create untagged broker and server instances
     String createInstanceUrl = 
TEST_INSTANCE.getControllerRequestURLBuilder().forInstanceCreate();
-    Instance brokerInstance = new Instance("1.2.3.4", 1234, 
InstanceType.BROKER, null, null, 0, 0, false);
+    Instance brokerInstance = new Instance("1.2.3.4", 1234, 
InstanceType.BROKER, null, null, 0, 0, 0, 0, false);
     ControllerTest.sendPostRequest(createInstanceUrl, 
brokerInstance.toJsonString());
 
-    Instance serverInstance = new Instance("1.2.3.4", 2345, 
InstanceType.SERVER, null, null, 8090, 8091, false);
+    Instance serverInstance = new Instance("1.2.3.4", 2345, 
InstanceType.SERVER, null, null, 8090, 8091, 8092, 8093,
+        false);
     ControllerTest.sendPostRequest(createInstanceUrl, 
serverInstance.toJsonString());
 
     // Check that we have added two more instances
@@ -95,7 +96,8 @@ public class PinotInstanceRestletResourceTest {
 
     // Create broker and server instances with tags and pools
     brokerInstance =
-        new Instance("2.3.4.5", 1234, InstanceType.BROKER, 
Collections.singletonList("tag_BROKER"), null, 0, 0, false);
+        new Instance("2.3.4.5", 1234, InstanceType.BROKER, 
Collections.singletonList("tag_BROKER"), null, 0, 0, 0, 0,
+            false);
     ControllerTest.sendPostRequest(createInstanceUrl, 
brokerInstance.toJsonString());
 
     Map<String, Integer> serverPools = new TreeMap<>();
@@ -103,7 +105,7 @@ public class PinotInstanceRestletResourceTest {
     serverPools.put("tag_REALTIME", 1);
     serverInstance =
         new Instance("2.3.4.5", 2345, InstanceType.SERVER, 
Arrays.asList("tag_OFFLINE", "tag_REALTIME"), serverPools,
-            18090, 18091, false);
+            18090, 18091, 18092, 18093, false);
     ControllerTest.sendPostRequest(createInstanceUrl, 
serverInstance.toJsonString());
 
     // Check that we have added four instances so far
@@ -137,7 +139,8 @@ public class PinotInstanceRestletResourceTest {
     // Test PUT Instance API
     String newBrokerTag = "new-broker-tag";
     Instance newBrokerInstance =
-        new Instance("1.2.3.4", 1234, InstanceType.BROKER, 
Collections.singletonList(newBrokerTag), null, 0, 0, false);
+        new Instance("1.2.3.4", 1234, InstanceType.BROKER, 
Collections.singletonList(newBrokerTag), null, 0, 0, 0, 0,
+            false);
     String brokerInstanceId = "Broker_1.2.3.4_1234";
     String brokerInstanceUrl = 
TEST_INSTANCE.getControllerRequestURLBuilder().forInstance(brokerInstanceId);
     ControllerTest.sendPutRequest(brokerInstanceUrl, 
newBrokerInstance.toJsonString());
@@ -145,7 +148,7 @@ public class PinotInstanceRestletResourceTest {
     String newServerTag = "new-server-tag";
     Instance newServerInstance =
         new Instance("1.2.3.4", 2345, InstanceType.SERVER, 
Collections.singletonList(newServerTag), null, 28090, 28091,
-            true);
+            28092, 28093, true);
     String serverInstanceId = "Server_1.2.3.4_2345";
     String serverInstanceUrl = 
TEST_INSTANCE.getControllerRequestURLBuilder().forInstance(serverInstanceId);
     ControllerTest.sendPutRequest(serverInstanceUrl, 
newServerInstance.toJsonString());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 88311095c0..b2bccec660 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -228,7 +228,7 @@ public class PinotHelixResourceManagerStatelessTest extends 
ControllerTest {
     String brokerToUntag = 
taggedBrokers.remove(ThreadLocalRandom.current().nextInt(3));
     Instance instance =
         new Instance("localhost", brokerToUntag.charAt(brokerToUntag.length() 
- 1) - '0', InstanceType.BROKER,
-            
Collections.singletonList(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE), 
null, 0, 0, false);
+            
Collections.singletonList(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE), 
null, 0, 0, 0, 0, false);
     assertTrue(_helixResourceManager.updateInstance(brokerToUntag, instance, 
true).isSuccessful());
     untaggedBrokers.add(brokerToUntag);
     checkBrokerResource(taggedBrokers);
@@ -241,7 +241,8 @@ public class PinotHelixResourceManagerStatelessTest extends 
ControllerTest {
 
     // Add a new broker instance
     Instance newBrokerInstance =
-        new Instance("localhost", 5, InstanceType.BROKER, 
Collections.singletonList(brokerTag), null, 0, 0, false);
+        new Instance("localhost", 5, InstanceType.BROKER, 
Collections.singletonList(brokerTag), null, 0, 0, 0, 0,
+            false);
     assertTrue(_helixResourceManager.addInstance(newBrokerInstance, 
true).isSuccessful());
     String newBrokerId = InstanceUtils.getHelixInstanceId(newBrokerInstance);
     taggedBrokers.add(newBrokerId);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index b167a18256..455496bbee 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -206,7 +206,7 @@ public class PinotHelixResourceManagerTest {
 
     // Add new instance.
     Instance instance = new Instance("localhost", biggerRandomNumber, 
InstanceType.SERVER,
-        Collections.singletonList(UNTAGGED_SERVER_INSTANCE), null, 0, 0, 
false);
+        Collections.singletonList(UNTAGGED_SERVER_INSTANCE), null, 0, 0, 0, 0, 
false);
     TEST_INSTANCE.getHelixResourceManager().addInstance(instance, false);
 
     List<String> allInstances = 
TEST_INSTANCE.getHelixResourceManager().getAllInstances();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
index b5b1049f49..149c1b0559 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
 
 
 public class ServerInstance {
+
   public enum RoutingType {
     NETTY, GRPC, NETTY_TLS
   }
@@ -40,6 +41,9 @@ public class ServerInstance {
   private final int _grpcPort;
   private final int _nettyTlsPort;
 
+  private final int _queryServicePort;
+  private final int _queryMailboxPort;
+
   /**
    * By default (auto joined instances), server instance name is of format: 
{@code Server_<hostname>_<port>}, e.g.
    * {@code Server_localhost_12345}, hostname is of format: {@code 
Server_<hostname>}, e.g. {@code Server_localhost}.
@@ -67,6 +71,10 @@ public class ServerInstance {
     }
     _grpcPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.GRPC_PORT_KEY, 
INVALID_PORT);
     _nettyTlsPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.NETTY_TLS_PORT_KEY, 
INVALID_PORT);
+    _queryServicePort = 
instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY,
+        INVALID_PORT);
+    _queryMailboxPort = 
instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
+        INVALID_PORT);
   }
 
   @VisibleForTesting
@@ -76,6 +84,8 @@ public class ServerInstance {
     _port = port;
     _grpcPort = INVALID_PORT;
     _nettyTlsPort = INVALID_PORT;
+    _queryServicePort = INVALID_PORT;
+    _queryMailboxPort = INVALID_PORT;
   }
 
   public String getInstanceId() {
@@ -93,6 +103,13 @@ public class ServerInstance {
   public int getGrpcPort() {
     return _grpcPort;
   }
+  public int getQueryServicePort() {
+    return _queryServicePort;
+  }
+
+  public int getQueryMailboxPort() {
+    return _queryMailboxPort;
+  }
 
   public int getNettyTlsPort() {
     return _nettyTlsPort;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
index c7e6bcfbd5..6bab8a2b83 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java
@@ -36,16 +36,21 @@ public class WorkerInstance extends ServerInstance {
     super(instanceConfig);
   }
 
-  public WorkerInstance(String hostname, int serverPort, int mailboxPort) {
-    super(toInstanceConfig(hostname, serverPort, mailboxPort));
+  public WorkerInstance(String hostname, int nettyPort, int grpcPort, int 
servicePort, int mailboxPort) {
+    super(toInstanceConfig(hostname, nettyPort, grpcPort, servicePort, 
mailboxPort));
   }
 
-  private static InstanceConfig toInstanceConfig(String hostname, int 
serverPort, int mailboxPort) {
-    String server = String.format("%s%s_%d", 
CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, serverPort);
+  private static InstanceConfig toInstanceConfig(String hostname, int 
nettyPort, int grpcPort, int servicePort,
+      int mailboxPort) {
+    String server = String.format("%s%s_%d", 
CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, nettyPort);
     InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server);
     ZNRecord znRecord = instanceConfig.getRecord();
     Map<String, String> simpleFields = znRecord.getSimpleFields();
-    simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, 
String.valueOf(mailboxPort));
+    simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, 
String.valueOf(grpcPort));
+    
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY,
+        String.valueOf(servicePort));
+    
simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
+        String.valueOf(mailboxPort));
     return instanceConfig;
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 74e2036e77..aa19d3bf63 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -66,7 +66,7 @@ public class WorkerManager {
     } else if (PlannerUtils.isRootStage(stageId)) {
       // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results.
       // here we simply assign the worker instance with identical 
server/mailbox port number.
-      stageMetadata.setServerInstances(Lists.newArrayList(new 
WorkerInstance(_hostName, _port, _port)));
+      stageMetadata.setServerInstances(Lists.newArrayList(new 
WorkerInstance(_hostName, _port, _port, _port, _port)));
     } else {
       
stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values()));
     }
@@ -76,9 +76,10 @@ public class WorkerManager {
     List<ServerInstance> serverInstances = new ArrayList<>();
     for (ServerInstance server : servers) {
       String hostname = server.getHostname();
-      if 
(!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && 
!hostname.startsWith(
-          CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) && 
!hostname.startsWith(
-          CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE) && 
server.getGrpcPort() > 0) {
+      if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0
+          && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)
+          && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE)
+          && 
!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) {
         serverInstances.add(server);
       }
     }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
index 7585f9bd7c..0993f892f1 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -90,8 +90,8 @@ public class QueryEnvironmentTestUtils {
     String server2 = String.format("localhost_%d", port2);
     // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
     // this is only use for test identifier purpose.
-    ServerInstance host1 = new WorkerInstance("localhost", port1, port1);
-    ServerInstance host2 = new WorkerInstance("localhost", port2, port2);
+    ServerInstance host1 = new WorkerInstance("localhost", port1, port1, 
port1, port1);
+    ServerInstance host2 = new WorkerInstance("localhost", port2, port2, 
port2, port2);
 
     RoutingTable rtA = mock(RoutingTable.class);
     when(rtA.getServerInstanceToSegmentsMap()).thenReturn(
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index b84d8f40f0..37a203f815 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -125,6 +125,6 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
 
   private String toMailboxId(ServerInstance serverInstance) {
     return new StringMailboxIdentifier(String.format("%s_%s", _jobId, 
_stageId), serverInstance.getHostname(),
-        serverInstance.getGrpcPort(), _hostName, _port).toString();
+        serverInstance.getQueryMailboxPort(), _hostName, _port).toString();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 8629663b60..3681acdc7d 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -215,6 +215,6 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
 
   private String toMailboxId(ServerInstance serverInstance) {
     return new StringMailboxIdentifier(String.format("%s_%s", _jobId, 
_stageId), _serverHostName, _serverPort,
-        serverInstance.getHostname(), serverInstance.getGrpcPort()).toString();
+        serverInstance.getHostname(), 
serverInstance.getQueryMailboxPort()).toString();
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 034bf561a2..49c2d62dcb 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -59,12 +59,15 @@ public class QueryPlanSerDeUtils {
 
   public static ServerInstance stringToInstance(String serverInstanceString) {
     String[] s = StringUtils.split(serverInstanceString, '_');
-    return new WorkerInstance(s[0], Integer.parseInt(s[1]), 
Integer.parseInt(s[2]));
+    // Skipped netty and grpc port as they are not used in worker instance.
+    return new WorkerInstance(s[0], Integer.parseInt(s[1]), 
Integer.parseInt(s[2]), Integer.parseInt(s[3]),
+        Integer.parseInt(s[4]));
   }
 
   public static String instanceToString(ServerInstance serverInstance) {
     return StringUtils.join(serverInstance.getHostname(), '_', 
serverInstance.getPort(), '_',
-        serverInstance.getGrpcPort());
+        serverInstance.getGrpcPort(), '_', 
serverInstance.getQueryServicePort(), '_',
+        serverInstance.getQueryMailboxPort());
   }
 
   public static Map<Integer, StageMetadata> 
protoMapToStageMetadataMap(Map<Integer, Worker.StageMetadata> protoMap) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index fa9f04808c..260c083ea7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.service;
 
+import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.ArrayList;
 import java.util.List;
@@ -80,14 +81,15 @@ public class QueryDispatcher {
         List<ServerInstance> serverInstances = 
stage.getValue().getServerInstances();
         for (ServerInstance serverInstance : serverInstances) {
           String host = serverInstance.getHostname();
-          int port = serverInstance.getPort();
-          DispatchClient client = getOrCreateDispatchClient(host, port);
+          int servicePort = serverInstance.getQueryServicePort();
+          int mailboxPort = serverInstance.getQueryMailboxPort();
+          DispatchClient client = getOrCreateDispatchClient(host, servicePort);
           Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder()
               
.setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan,
 stageId,
                   serverInstance)))
               .putMetadata("REQUEST_ID", String.valueOf(requestId))
               .putMetadata("SERVER_INSTANCE_HOST", 
serverInstance.getHostname())
-              .putMetadata("SERVER_INSTANCE_PORT", 
String.valueOf(serverInstance.getGrpcPort())).build());
+              .putMetadata("SERVER_INSTANCE_PORT", 
String.valueOf(mailboxPort)).build());
           if (response.containsMetadata("ERROR")) {
             throw new RuntimeException(
                 String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", stageId,
@@ -134,12 +136,21 @@ public class QueryDispatcher {
     return mailboxReceiveOperator;
   }
 
+  public void shutdown() {
+    for (DispatchClient dispatchClient : _dispatchClientMap.values()) {
+      dispatchClient._managedChannel.shutdown();
+    }
+    _dispatchClientMap.clear();
+  }
+
   public static class DispatchClient {
     private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub 
_blockingStub;
+    private final ManagedChannel _managedChannel;
 
     public DispatchClient(String host, int port) {
       ManagedChannelBuilder managedChannelBuilder = 
ManagedChannelBuilder.forAddress(host, port).usePlaintext();
-      _blockingStub = 
PinotQueryWorkerGrpc.newBlockingStub(managedChannelBuilder.build());
+      _managedChannel = managedChannelBuilder.build();
+      _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(_managedChannel);
     }
 
     public Worker.QueryResponse submit(Worker.QueryRequest request) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index d133584797..f76917b766 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -87,8 +87,10 @@ public class QueryRunnerTest {
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
     // this is only use for test identifier purpose.
-    _servers.put(new WorkerInstance("localhost", server1.getPort(), 
server1.getPort()), server1);
-    _servers.put(new WorkerInstance("localhost", server2.getPort(), 
server2.getPort()), server2);
+    int port1 = server1.getPort();
+    int port2 = server2.getPort();
+    _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1), 
server1);
+    _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2), 
server2);
   }
 
   @AfterClass
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 735c54dd0c..245f78e651 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -70,7 +70,8 @@ public class QueryServerTest {
       _queryRunnerMap.put(availablePort, queryRunner);
       // this only test the QueryServer functionality so the server port can 
be the same as the mailbox port.
       // this is only use for test identifier purpose.
-      _queryServerInstanceMap.put(availablePort, new 
WorkerInstance("localhost", availablePort, availablePort));
+      _queryServerInstanceMap.put(availablePort, new 
WorkerInstance("localhost", availablePort, availablePort,
+          availablePort, availablePort));
     }
 
     List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet());
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index d1ab3c83c6..4408b3864d 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -47,6 +47,14 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-planner</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-query-runtime</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-core</artifactId>
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java 
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index d630013419..a1f8e910af 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -96,6 +96,11 @@ public class ServerConf {
     return _serverConf.getProperty(Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, 
Server.DEFAULT_GRPCTLS_SERVER_ENABLED);
   }
 
+  public boolean isMultiStageServerEnabled() {
+    return _serverConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED,
+        Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED);
+  }
+
   public boolean isEnableSwagger() {
     return _serverConf.getProperty(CONFIG_OF_SWAGGER_SERVER_ENABLED, 
DEFAULT_SWAGGER_SERVER_ENABLED);
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 7041a40b24..4103950621 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.server.worker.WorkerQueryServer;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -63,6 +64,8 @@ public class ServerInstance {
   private final GrpcQueryServer _grpcQueryServer;
   private final AccessControl _accessControl;
 
+  private final WorkerQueryServer _workerQueryServer;
+
   private boolean _started = false;
 
   public ServerInstance(ServerConf serverConf, HelixManager helixManager, 
AccessControlFactory accessControlFactory)
@@ -102,30 +105,42 @@ public class ServerInstance {
       
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
 helixManager);
     _accessControl = accessControlFactory.create();
 
-    if (serverConf.isNettyServerEnabled()) {
-      int nettyPort = serverConf.getNettyPort();
-      LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
-      _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, 
_serverMetrics, nettyConfig);
-    } else {
+    if (serverConf.isMultiStageServerEnabled()) {
+      // WorkerQueryServer initialization
+      // because worker requires both the "Netty port" for protocol transport; 
and "GRPC port" for mailbox transport,
+      // we can't enable any of the other 2 servers
+      // TODO: decouple server protocol and engine type.
+      _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), 
_instanceDataManager, _serverMetrics);
       _nettyQueryServer = null;
-    }
-
-    if (serverConf.isNettyTlsServerEnabled()) {
-      int nettySecPort = serverConf.getNettyTlsPort();
-      LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
-      _nettyTlsQueryServer =
-          new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, 
nettyConfig, tlsConfig, _accessControl);
-    } else {
       _nettyTlsQueryServer = null;
-    }
-    if (serverConf.isEnableGrpcServer()) {
-      int grpcPort = serverConf.getGrpcPort();
-      LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
-      _grpcQueryServer = new GrpcQueryServer(grpcPort,
-          serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
-              CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
-    } else {
       _grpcQueryServer = null;
+    } else {
+      _workerQueryServer = null;
+      if (serverConf.isNettyServerEnabled()) {
+        int nettyPort = serverConf.getNettyPort();
+        LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
+        _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, 
_serverMetrics, nettyConfig);
+      } else {
+        _nettyQueryServer = null;
+      }
+
+      if (serverConf.isNettyTlsServerEnabled()) {
+        int nettySecPort = serverConf.getNettyTlsPort();
+        LOGGER.info("Initializing TLS-secured Netty query server on port: {}", 
nettySecPort);
+        _nettyTlsQueryServer =
+            new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, 
nettyConfig, tlsConfig, _accessControl);
+      } else {
+        _nettyTlsQueryServer = null;
+      }
+      if (serverConf.isEnableGrpcServer()) {
+        int grpcPort = serverConf.getGrpcPort();
+        LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
+        _grpcQueryServer = new GrpcQueryServer(grpcPort,
+            serverConf.isGrpcTlsServerEnabled() ? 
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
+                CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, 
_queryExecutor, _serverMetrics, _accessControl);
+      } else {
+        _grpcQueryServer = null;
+      }
     }
 
     LOGGER.info("Initializing transform functions");
@@ -171,6 +186,10 @@ public class ServerInstance {
       LOGGER.info("Starting gRPC query server");
       _grpcQueryServer.start();
     }
+    if (_workerQueryServer != null) {
+      LOGGER.info("Starting worker query server");
+      _workerQueryServer.start();
+    }
 
     _started = true;
     LOGGER.info("Finish starting server instance");
@@ -196,6 +215,10 @@ public class ServerInstance {
       LOGGER.info("Shutting down Netty query server");
       _nettyQueryServer.shutDown();
     }
+    if (_workerQueryServer != null) {
+      LOGGER.info("Shutting down worker query server");
+      _workerQueryServer.shutDown();
+    }
     LOGGER.info("Shutting down query scheduler");
     _queryScheduler.stop();
     LOGGER.info("Shutting down query executor");
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
new file mode 100644
index 0000000000..711c763c38
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.worker;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.query.service.QueryServer;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.NetUtils;
+
+
+public class WorkerQueryServer {
+  private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
+
+  private final ExecutorService _executor;
+  private final int _queryServicePort;
+  private final PinotConfiguration _configuration;
+
+  private QueryServer _queryWorkerService;
+  private QueryRunner _queryRunner;
+  private InstanceDataManager _instanceDataManager;
+  private ServerMetrics _serverMetrics;
+
+  public WorkerQueryServer(PinotConfiguration configuration, 
InstanceDataManager instanceDataManager,
+      ServerMetrics serverMetrics) {
+    _configuration = toWorkerQueryConfig(configuration);
+    _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
+    _queryServicePort =
+        _configuration.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 
QueryConfig.DEFAULT_QUERY_SERVER_PORT);
+    _queryRunner = new QueryRunner();
+    _queryRunner.init(_configuration, _instanceDataManager, _serverMetrics);
+    _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
+    _executor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM,
+        new NamedThreadFactory("worker_query_server_enclosure_on_" + 
_queryServicePort + "_port"));
+  }
+
+  private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration 
configuration) {
+    PinotConfiguration newConfig = new 
PinotConfiguration(configuration.toMap());
+    String hostname = 
newConfig.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
+    if (hostname == null) {
+      String instanceId =
+          
newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, 
NetUtils.getHostnameOrAddress());
+      hostname = 
instanceId.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? 
instanceId.substring(
+          CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId;
+      newConfig.addProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, 
hostname);
+    }
+    int runnerPort = 
newConfig.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 
QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
+    if (runnerPort == -1) {
+      runnerPort =
+          newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT, 
CommonConstants.Server.DEFAULT_GRPC_PORT);
+      newConfig.addProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, runnerPort);
+    }
+    int servicePort =
+        newConfig.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, 
QueryConfig.DEFAULT_QUERY_SERVER_PORT);
+    if (servicePort == -1) {
+      servicePort = 
newConfig.getProperty(CommonConstants.Server.CONFIG_OF_NETTY_PORT,
+          CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
+      newConfig.addProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, servicePort);
+    }
+    return newConfig;
+  }
+
+  public int getPort() {
+    return _queryServicePort;
+  }
+
+  public void start() {
+    try {
+      _queryWorkerService.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void shutDown() {
+    _queryWorkerService.shutdown();
+    _executor.shutdown();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java
index 6e0b502632..a5e5109eef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java
@@ -46,6 +46,8 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 public class Instance extends BaseJsonConfig {
   public static final int NOT_SET_GRPC_PORT_VALUE = -1;
   public static final int NOT_SET_ADMIN_PORT_VALUE = -1;
+  public static final int NOT_SET_QUERY_SERVER_PORT_VALUE = -1;
+  public static final int NOT_SET_QUERY_MAILBOX_PORT_VALUE = -1;
 
   private final String _host;
   private final int _port;
@@ -54,6 +56,8 @@ public class Instance extends BaseJsonConfig {
   private final Map<String, Integer> _pools;
   private final int _grpcPort;
   private final int _adminPort;
+  private final int _queryServicePort;
+  private final int _queryMailboxPort;
   private final boolean _queriesDisabled;
 
   @JsonCreator
@@ -62,6 +66,7 @@ public class Instance extends BaseJsonConfig {
       @JsonProperty(value = "type", required = true) InstanceType type,
       @JsonProperty("tags") @Nullable List<String> tags, 
@JsonProperty("pools") @Nullable Map<String, Integer> pools,
       @JsonProperty("grpcPort") int grpcPort, @JsonProperty("adminPort") int 
adminPort,
+      @JsonProperty("queryServicePort") int queryServicePort, 
@JsonProperty("queryMailboxPort") int queryMailboxPort,
       @JsonProperty("queriesDisabled") boolean queriesDisabled) {
     Preconditions.checkArgument(host != null, "'host' must be configured");
     Preconditions.checkArgument(type != null, "'type' must be configured");
@@ -80,6 +85,16 @@ public class Instance extends BaseJsonConfig {
     } else {
       _adminPort = adminPort;
     }
+    if (queryServicePort == 0) {
+      _queryServicePort = NOT_SET_QUERY_SERVER_PORT_VALUE;
+    } else {
+      _queryServicePort = queryServicePort;
+    }
+    if (queryMailboxPort == 0) {
+      _queryMailboxPort = NOT_SET_QUERY_MAILBOX_PORT_VALUE;
+    } else {
+      _queryMailboxPort = queryMailboxPort;
+    }
     _queriesDisabled = queriesDisabled;
   }
 
@@ -113,6 +128,14 @@ public class Instance extends BaseJsonConfig {
     return _adminPort;
   }
 
+  public int getQueryServicePort() {
+    return _queryServicePort;
+  }
+
+  public int getQueryMailboxPort() {
+    return _queryMailboxPort;
+  }
+
   public boolean isQueriesDisabled() {
     return _queriesDisabled;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d2e6a63f8d..b150a53ad5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -138,6 +138,10 @@ public class CommonConstants {
       public static final String ADMIN_HTTPS_PORT_KEY = "adminHttpsPort";
       public static final String GRPC_PORT_KEY = "grpcPort";
       public static final String NETTY_TLS_PORT_KEY = "nettyTlsPort";
+
+      public static final String MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY = 
"queryServerPort";
+      public static final String MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY = 
"queryMailboxPort";
+
       public static final String SYSTEM_RESOURCE_INFO_KEY = 
"SYSTEM_RESOURCE_INFO";
     }
 
@@ -172,6 +176,9 @@ public class CommonConstants {
     public static final String CONFIG_OF_PINOT_BROKER_STARTABLE_CLASS = 
"pinot.broker.startable.class";
     public static final String CONFIG_OF_PINOT_SERVER_STARTABLE_CLASS = 
"pinot.server.startable.class";
     public static final String CONFIG_OF_PINOT_MINION_STARTABLE_CLASS = 
"pinot.minion.startable.class";
+
+    public static final String CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED = 
"pinot.multistage.engine.enabled";
+    public static final boolean DEFAULT_MULTI_STAGE_ENGINE_ENABLED = false;
   }
 
   public static class Broker {
@@ -218,11 +225,15 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = 
"pinot.broker.groupby.trim.threshold";
     public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000;
 
+    // Configure the request handler type used by broker to handler inbound 
query request.
+    // NOTE: the request handler type refers to the communication between 
Broker and Server.
     public static final String BROKER_REQUEST_HANDLER_TYPE = 
"pinot.broker.request.handler.type";
     public static final String NETTY_BROKER_REQUEST_HANDLER_TYPE = "netty";
     public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc";
+    public static final String MULTI_STAGE_BROKER_REQUEST_HANDLER_TYPE = 
"multistage";
     public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = 
NETTY_BROKER_REQUEST_HANDLER_TYPE;
 
+
     public static final String BROKER_TLS_PREFIX = "pinot.broker.tls";
     public static final String BROKER_NETTY_PREFIX = "pinot.broker.netty";
     public static final String BROKER_NETTYTLS_ENABLED = 
"pinot.broker.nettytls.enabled";
@@ -253,6 +264,7 @@ public class CommonConstants {
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = 
"minServerGroupTrimSize";
         public static final String NUM_REPLICA_GROUPS_TO_QUERY = 
"numReplicaGroupsToQuery";
         public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
+        public static final String USE_MULTISTAGE_ENGINE = 
"useMultistageEngine";
 
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to