This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f88a275  Add streaming query handler (#5717)
f88a275 is described below

commit f88a2759448b8d9eaeeb1b33b894e24df852b9c2
Author: Elon Azoulay <elon.azou...@gmail.com>
AuthorDate: Tue Sep 8 18:12:49 2020 -0700

    Add streaming query handler (#5717)
    
    Add streaming query handler and a gRPC query server to stream the query 
responses
    Currently only support streaming selection only queries
    
    Added `GrpcRequestBuilder` and `GrpcQueryClient` to help build the request 
and query the server
    
    Co-authored-by: Xiaotian (Jackie) Jiang <jackie....@gmail.com>
---
 .../pinot/common/exception/QueryException.java     |   4 +
 .../apache/pinot/common/utils/CommonConstants.java |  39 ++-
 .../pinot/common/utils/grpc/GrpcQueryClient.java   |  26 +-
 .../common/utils/grpc/GrpcRequestBuilder.java      | 106 +++++++
 .../operator/blocks/IntermediateResultsBlock.java  |  11 +-
 .../operator/streaming/StreamingResponseUtils.java |  52 ++++
 .../StreamingSelectionOnlyCombineOperator.java     | 189 ++++++++++++
 .../streaming/StreamingSelectionOnlyOperator.java  | 111 +++++++
 .../apache/pinot/core/plan/CombinePlanNode.java    |  14 +-
 .../core/plan/StreamingSelectionPlanNode.java      |  55 ++++
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  27 +-
 .../apache/pinot/core/plan/maker/PlanMaker.java    |  15 +
 .../core/query/executor/GrpcQueryExecutor.java     | 327 +++++++++++++++++++++
 .../core/query/request/ServerQueryRequest.java     |  69 ++++-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  38 ++-
 .../combine/SelectionCombineOperatorTest.java      |   2 +-
 .../pinot/core/plan/CombinePlanNodeTest.java       |   6 +-
 .../MultiNodesOfflineClusterIntegrationTest.java   |  14 +
 .../tests/OfflineClusterIntegrationTest.java       |  98 +++++-
 .../org/apache/pinot/server/conf/ServerConf.java   |  22 +-
 .../pinot/server/starter/ServerInstance.java       |  41 ++-
 .../server/starter/grpc/PinotQueryService.java     |  23 --
 .../server/starter/helix/HelixServerStarter.java   | 171 +++++------
 23 files changed, 1255 insertions(+), 205 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 89dda00..10a1544 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -45,6 +45,7 @@ public class QueryException {
   // TODO: Handle these errors in broker
   public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
   public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
+  public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230;
   public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
   public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
   public static final int BROKER_GATHER_ERROR_CODE = 300;
@@ -76,6 +77,8 @@ public class QueryException {
       new ProcessingException(SERVER_SHUTTING_DOWN_ERROR_CODE);
   public static final ProcessingException SERVER_OUT_OF_CAPACITY_ERROR =
       new ProcessingException(SERVER_OUT_OF_CAPACITY_ERROR_CODE);
+  public static final ProcessingException SERVER_TABLE_MISSING_ERROR =
+      new ProcessingException(SERVER_TABLE_MISSING_ERROR_CODE);
   public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR =
       new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
   public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
@@ -108,6 +111,7 @@ public class QueryException {
     QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
     SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown");
     SERVER_OUT_OF_CAPACITY_ERROR.setMessage("ServerOutOfCapacity");
+    SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
     QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
     EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
     BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index e116849..ac3a796 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -105,6 +105,7 @@ public class CommonConstants {
       public static final String INSTANCE_ID_KEY = "instanceId";
       public static final String DATA_DIR_KEY = "dataDir";
       public static final String ADMIN_PORT_KEY = "adminPort";
+      public static final String GRPC_PORT_KEY = "grpcPort";
     }
 
     public static final String SET_INSTANCE_ID_TO_HOSTNAME_KEY = 
"pinot.set.instance.id.to.hostname";
@@ -190,7 +191,12 @@ public class CommonConstants {
     public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS = 
"pinot.server.query.executor.class";
     public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = 
"pinot.server.requestHandlerFactory.class";
     public static final String CONFIG_OF_NETTY_PORT = 
"pinot.server.netty.port";
+    public static final String CONFIG_OF_ENABLE_GRPC_SERVER = 
"pinot.server.grpc.enable";
+    public static final boolean DEFAULT_ENABLE_GRPC_SERVER = false;
+    public static final String CONFIG_OF_GRPC_PORT = "pinot.server.grpc.port";
+    public static final int DEFAULT_GRPC_PORT = 8090;
     public static final String CONFIG_OF_ADMIN_API_PORT = 
"pinot.server.adminapi.port";
+    public static final int DEFAULT_ADMIN_API_PORT = 8097;
 
     public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = 
"pinot.server.instance.segment.format.version";
     public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = 
"pinot.server.instance.enable.split.commit";
@@ -211,7 +217,6 @@ public class CommonConstants {
         "pinot.server.starter.realtimeConsumptionCatchupWaitMs";
     public static final int 
DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0;
 
-    public static final int DEFAULT_ADMIN_API_PORT = 8097;
     public static final String DEFAULT_READ_MODE = "mmap";
     // Whether to reload consuming segment on scheme update. Will change 
default behavior to true when this feature is stabilized
     public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = false;
@@ -387,4 +392,36 @@ public class CommonConstants {
     @Deprecated
     public static final String TABLE_NAME = "segment.table.name";
   }
+
+  public static class Query {
+    public static class Request {
+      public static class MetadataKeys {
+        public static final String REQUEST_ID = "requestId";
+        public static final String BROKER_ID = "brokerId";
+        public static final String ENABLE_TRACE = "enableTrace";
+        public static final String ENABLE_STREAMING = "enableStreaming";
+        public static final String PAYLOAD_TYPE = "payloadType";
+      }
+
+      public static class PayloadType {
+        public static final String SQL = "sql";
+        public static final String BROKER_REQUEST = "brokerRequest";
+      }
+    }
+
+    public static class Response {
+      public static class MetadataKeys {
+        public static final String RESPONSE_TYPE = "responseType";
+      }
+
+      public static class ResponseType {
+        // For streaming response, multiple (could be 0 if no data should be 
returned, or query encounters exception)
+        // data responses will be returned, followed by one single metadata 
response
+        public static final String DATA = "data";
+        public static final String METADATA = "metadata";
+        // For non-streaming response
+        public static final String NON_STREAMING = "nonStreaming";
+      }
+    }
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
similarity index 58%
rename from 
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 23cff7ad..68c03b8 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -16,20 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.grpc;
+package org.apache.pinot.common.utils.grpc;
 
-import io.grpc.stub.StreamObserver;
+import io.grpc.Channel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Iterator;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
 
-/**
- * Handler for grpc server requests.
- * As data becomes available server responses will be added to the result 
stream.
- * Once the request is complete the client will aggregate the result metadata.
- */
-public class PinotQueryHandler extends 
PinotQueryServerGrpc.PinotQueryServerImplBase {
-  @Override
-  public void submit(Server.ServerRequest request, 
StreamObserver<Server.ServerResponse> responseObserver) {
 
+public class GrpcQueryClient {
+  private final PinotQueryServerGrpc.PinotQueryServerBlockingStub 
_blockingStub;
+
+  public GrpcQueryClient(String host, int port) {
+    // Set max message size to 128MB
+    Channel channel =
+        ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(128 
* 1024 * 1024).usePlaintext().build();
+    _blockingStub = PinotQueryServerGrpc.newBlockingStub(channel);
+  }
+
+  public Iterator<Server.ServerResponse> submit(Server.ServerRequest request) {
+    return _blockingStub.submit(request);
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
new file mode 100644
index 0000000..0d6daeb
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils.grpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+
+public class GrpcRequestBuilder {
+  private int _requestId;
+  private String _brokerId = "unknown";
+  private boolean _enableTrace;
+  private boolean _enableStreaming;
+  private String _payloadType;
+  private String _sql;
+  private BrokerRequest _brokerRequest;
+  private List<String> _segments;
+
+  public GrpcRequestBuilder setRequestId(int requestId) {
+    _requestId = requestId;
+    return this;
+  }
+
+  public GrpcRequestBuilder setBrokerId(String brokerId) {
+    _brokerId = brokerId;
+    return this;
+  }
+
+  public GrpcRequestBuilder setEnableTrace(boolean enableTrace) {
+    _enableTrace = enableTrace;
+    return this;
+  }
+
+  public GrpcRequestBuilder setEnableStreaming(boolean enableStreaming) {
+    _enableStreaming = enableStreaming;
+    return this;
+  }
+
+  public GrpcRequestBuilder setSql(String sql) {
+    _payloadType = Request.PayloadType.SQL;
+    _sql = sql;
+    return this;
+  }
+
+  public GrpcRequestBuilder setBrokerRequest(BrokerRequest brokerRequest) {
+    _payloadType = Request.PayloadType.BROKER_REQUEST;
+    _brokerRequest = brokerRequest;
+    return this;
+  }
+
+  public GrpcRequestBuilder setSegments(List<String> segments) {
+    _segments = segments;
+    return this;
+  }
+
+  public Server.ServerRequest build() {
+    Preconditions.checkState(_payloadType != null && 
CollectionUtils.isNotEmpty(_segments),
+        "Query and segmentsToQuery must be set");
+
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put(Request.MetadataKeys.REQUEST_ID, 
Integer.toString(_requestId));
+    metadata.put(Request.MetadataKeys.BROKER_ID, _brokerId);
+    metadata.put(Request.MetadataKeys.ENABLE_TRACE, 
Boolean.toString(_enableTrace));
+    metadata.put(Request.MetadataKeys.ENABLE_STREAMING, 
Boolean.toString(_enableStreaming));
+    metadata.put(Request.MetadataKeys.PAYLOAD_TYPE, _payloadType);
+
+    if (_payloadType.equals(Request.PayloadType.SQL)) {
+      return 
Server.ServerRequest.newBuilder().putAllMetadata(metadata).setSql(_sql).addAllSegments(_segments).build();
+    } else {
+      byte[] payLoad;
+      try {
+        payLoad = new TSerializer(new 
TCompactProtocol.Factory()).serialize(_brokerRequest);
+      } catch (TException e) {
+        throw new RuntimeException("Caught exception while serializing broker 
request: " + _brokerRequest, e);
+      }
+      return 
Server.ServerRequest.newBuilder().putAllMetadata(metadata).setPayload(ByteString.copyFrom(payLoad))
+          .addAllSegments(_segments).build();
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index d0eb2e2..79ac520 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -67,6 +67,9 @@ public class IntermediateResultsBlock implements Block {
 
   private Table _table;
 
+  public IntermediateResultsBlock() {
+  }
+
   /**
    * Constructor for selection result.
    */
@@ -270,11 +273,7 @@ public class IntermediateResultsBlock implements Block {
       return getAggregationGroupByResultDataTable();
     }
 
-    if (_processingExceptions != null && _processingExceptions.size() > 0) {
-      return getProcessingExceptionsDataTable();
-    }
-
-    throw new UnsupportedOperationException("No data inside 
IntermediateResultsBlock.");
+    return getMetadataDataTable();
   }
 
   private DataTable getResultDataTable()
@@ -405,7 +404,7 @@ public class IntermediateResultsBlock implements Block {
     return attachMetadataToDataTable(dataTable);
   }
 
-  private DataTable getProcessingExceptionsDataTable() {
+  private DataTable getMetadataDataTable() {
     return attachMetadataToDataTable(new DataTableImplV2());
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
new file mode 100644
index 0000000..3330342
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants.Query.Response;
+import org.apache.pinot.common.utils.DataTable;
+
+
+public class StreamingResponseUtils {
+  private StreamingResponseUtils() {
+  }
+
+  public static Server.ServerResponse getDataResponse(DataTable dataTable)
+      throws IOException {
+    return getResponse(dataTable, Response.ResponseType.DATA);
+  }
+
+  public static Server.ServerResponse getMetadataResponse(DataTable dataTable)
+      throws IOException {
+    return getResponse(dataTable, Response.ResponseType.METADATA);
+  }
+
+  public static Server.ServerResponse getNonStreamingResponse(DataTable 
dataTable)
+      throws IOException {
+    return getResponse(dataTable, Response.ResponseType.NON_STREAMING);
+  }
+
+  private static Server.ServerResponse getResponse(DataTable dataTable, String 
responseType)
+      throws IOException {
+    return 
Server.ServerResponse.newBuilder().putMetadata(Response.MetadataKeys.RESPONSE_TYPE,
 responseType)
+        .setPayload(ByteString.copyFrom(dataTable.toBytes())).build();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
new file mode 100644
index 0000000..619bcc5
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
+import org.apache.pinot.core.query.exception.EarlyTerminationException;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for selection only streaming queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class StreamingSelectionOnlyCombineOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class);
+  private static final String OPERATOR_NAME = 
"StreamingSelectionOnlyCombineOperator";
+
+  // Special IntermediateResultsBlock to indicate that this is the last 
results block for an operator
+  private static final IntermediateResultsBlock LAST_RESULTS_BLOCK =
+      new IntermediateResultsBlock(new DataSchema(new String[0], new 
DataSchema.ColumnDataType[0]),
+          Collections.emptyList());
+
+  private final List<Operator> _operators;
+  private final QueryContext _queryContext;
+  private final ExecutorService _executorService;
+  private final long _endTimeMs;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+  private final int _limit;
+
+  public StreamingSelectionOnlyCombineOperator(List<Operator> operators, 
QueryContext queryContext,
+      ExecutorService executorService, long endTimeMs, 
StreamObserver<Server.ServerResponse> streamObserver) {
+    _operators = operators;
+    _queryContext = queryContext;
+    _executorService = executorService;
+    _endTimeMs = endTimeMs;
+    _streamObserver = streamObserver;
+    _limit = queryContext.getLimit();
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numOperators = _operators.size();
+    int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);
+
+    // Use a BlockingQueue to store all the results blocks
+    BlockingQueue<IntermediateResultsBlock> blockingQueue = new 
LinkedBlockingQueue<>();
+    // Use a Phaser to ensure all the Futures are done (not scheduled, 
finished or interrupted) before the main thread
+    // returns. We need to ensure this because the main thread holds the 
reference to the segments. If a segment is
+    // deleted/refreshed, the segment will be released after the main thread 
returns, which would lead to undefined
+    // behavior (even JVM crash) when processing queries against it.
+    Phaser phaser = new Phaser(1);
+
+    Future[] futures = new Future[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      int threadIndex = i;
+      futures[i] = _executorService.submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          try {
+            // Register the thread to the phaser
+            // NOTE: If the phaser is terminated (returning negative value) 
when trying to register the thread, that
+            //       means the query execution has finished, and the main 
thread has deregistered itself and returned
+            //       the result. Directly return as no execution result will 
be taken.
+            if (phaser.register() < 0) {
+              return;
+            }
+
+            int numRowsCollected = 0;
+            for (int operatorIndex = threadIndex; operatorIndex < 
numOperators; operatorIndex += numThreads) {
+              Operator<IntermediateResultsBlock> operator = 
_operators.get(operatorIndex);
+              try {
+                IntermediateResultsBlock resultsBlock;
+                while ((resultsBlock = operator.nextBlock()) != null) {
+                  Collection<Object[]> rows = 
resultsBlock.getSelectionResult();
+                  assert rows != null;
+                  numRowsCollected += rows.size();
+                  blockingQueue.offer(resultsBlock);
+                  if (numRowsCollected >= _limit) {
+                    return;
+                  }
+                }
+                blockingQueue.offer(LAST_RESULTS_BLOCK);
+              } catch (EarlyTerminationException e) {
+                // Early-terminated by interruption (canceled by the main 
thread)
+                return;
+              } catch (Exception e) {
+                // Caught exception, skip processing the remaining operators
+                LOGGER.error("Caught exception while executing operator of 
index: {} (query: {})", operatorIndex,
+                    _queryContext, e);
+                blockingQueue.offer(new IntermediateResultsBlock(e));
+                return;
+              }
+            }
+          } finally {
+            phaser.arriveAndDeregister();
+          }
+        }
+      });
+    }
+
+    try {
+      int numRowsCollected = 0;
+      int numOperatorsFinished = 0;
+      while (numRowsCollected < _limit && numOperatorsFinished < numOperators) 
{
+        IntermediateResultsBlock resultsBlock =
+            blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+        if (resultsBlock == null) {
+          // Query times out, skip streaming the remaining results blocks
+          LOGGER.error("Timed out while polling results block (query: {})", 
_queryContext);
+          return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+              new TimeoutException("Timed out while polling results block")));
+        }
+        if (resultsBlock.getProcessingExceptions() != null) {
+          // Caught exception while processing segment, skip streaming the 
remaining results blocks and directly return
+          // the exception
+          return resultsBlock;
+        }
+        if (resultsBlock == LAST_RESULTS_BLOCK) {
+          numOperatorsFinished++;
+          continue;
+        }
+        DataSchema dataSchema = resultsBlock.getDataSchema();
+        Collection<Object[]> rows = resultsBlock.getSelectionResult();
+        assert dataSchema != null && rows != null;
+        numRowsCollected += rows.size();
+        DataTable dataTable = 
SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema);
+        
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable));
+      }
+      IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock();
+      CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators);
+      return metadataBlock;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while streaming results blocks (query: 
{})", _queryContext, e);
+      return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e);
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      // Deregister the main thread and wait for all threads done
+      phaser.awaitAdvance(phaser.arriveAndDeregister());
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
new file mode 100644
index 0000000..4bb991d
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+public class StreamingSelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "StreamingSelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final List<ExpressionContext> _expressions;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
+  private final int _limit;
+
+  private int _numDocsScanned = 0;
+
+  public StreamingSelectionOnlyOperator(IndexSegment indexSegment, 
QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator 
transformOperator) {
+    _indexSegment = indexSegment;
+    _transformOperator = transformOperator;
+    _expressions = expressions;
+
+    int numExpressions = expressions.size();
+    _blockValSets = new BlockValSet[numExpressions];
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      ExpressionContext expression = expressions.get(i);
+      TransformResultMetadata expressionMetadata = 
transformOperator.getResultMetadata(expression);
+      columnNames[i] = expression.toString();
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    _limit = queryContext.getLimit();
+  }
+
+  @Nullable
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    if (_numDocsScanned >= _limit) {
+      // Already returned enough documents
+      return null;
+    }
+    TransformBlock transformBlock = _transformOperator.nextBlock();
+    if (transformBlock == null) {
+      return null;
+    }
+    int numExpressions = _expressions.size();
+    for (int i = 0; i < numExpressions; i++) {
+      _blockValSets[i] = transformBlock.getBlockValueSet(_expressions.get(i));
+    }
+    RowBasedBlockValueFetcher blockValueFetcher = new 
RowBasedBlockValueFetcher(_blockValSets);
+    int numDocs = transformBlock.getNumDocs();
+    int numDocsToReturn = Math.min(_limit - _numDocsScanned, numDocs);
+    List<Object[]> rows = new ArrayList<>(numDocsToReturn);
+    for (int i = 0; i < numDocsToReturn; i++) {
+      rows.add(blockValueFetcher.getRow(i));
+    }
+    _numDocsScanned += numDocs;
+    return new IntermediateResultsBlock(_dataSchema, rows);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    long numEntriesScannedInFilter = 
_transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * 
_transformOperator.getNumColumnsProjected();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, 
numEntriesScannedPostFilter,
+        numTotalDocs);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index b6d9155..0bce7c0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.plan;
 
+import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -25,6 +26,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator;
@@ -32,6 +35,7 @@ import 
org.apache.pinot.core.operator.combine.GroupByCombineOperator;
 import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
 import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
 import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator;
+import 
org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
@@ -56,6 +60,7 @@ public class CombinePlanNode implements PlanNode {
   private final ExecutorService _executorService;
   private final long _endTimeMs;
   private final int _numGroupsLimit;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
 
   /**
    * Constructor for the class.
@@ -65,14 +70,16 @@ public class CombinePlanNode implements PlanNode {
    * @param executorService Executor service
    * @param endTimeMs End time in milliseconds for the query
    * @param numGroupsLimit Limit of number of groups stored in each segment
+   * @param streamObserver Optional stream observer for streaming query
    */
   public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, 
ExecutorService executorService,
-      long endTimeMs, int numGroupsLimit) {
+      long endTimeMs, int numGroupsLimit, @Nullable 
StreamObserver<Server.ServerResponse> streamObserver) {
     _planNodes = planNodes;
     _queryContext = queryContext;
     _executorService = executorService;
     _endTimeMs = endTimeMs;
     _numGroupsLimit = numGroupsLimit;
+    _streamObserver = streamObserver;
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
@@ -155,6 +162,11 @@ public class CombinePlanNode implements PlanNode {
       }
     }
 
+    if (_streamObserver != null) {
+      // Streaming query (only support selection only)
+      return new StreamingSelectionOnlyCombineOperator(operators, 
_queryContext, _executorService, _endTimeMs,
+          _streamObserver);
+    }
     if (QueryContextUtils.isAggregationQuery(_queryContext)) {
       if (_queryContext.getGroupByExpressions() == null) {
         // Aggregation only
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
new file mode 100644
index 0000000..1ae28be
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+
+/**
+ * The {@code StreamingSelectionPlanNode} class provides the execution plan 
for streaming selection query on a single
+ * segment.
+ * <p>NOTE: ORDER-BY is ignored for streaming selection query.
+ */
+public class StreamingSelectionPlanNode implements PlanNode {
+  private final IndexSegment _indexSegment;
+  private final QueryContext _queryContext;
+  private final List<ExpressionContext> _expressions;
+  private final TransformPlanNode _transformPlanNode;
+
+  public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext 
queryContext) {
+    Preconditions
+        .checkState(queryContext.getOrderByExpressions() == null, "Selection 
order-by is not supported for streaming");
+    _indexSegment = indexSegment;
+    _queryContext = queryContext;
+    _expressions = SelectionOperatorUtils.extractExpressions(queryContext, 
indexSegment);
+    _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, 
_expressions,
+        Math.min(queryContext.getLimit(), DocIdSetPlanNode.MAX_DOC_PER_CALL));
+  }
+
+  @Override
+  public StreamingSelectionOnlyOperator run() {
+    return new StreamingSelectionOnlyOperator(_indexSegment, _queryContext, 
_expressions, _transformPlanNode.run());
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 746b08a..5f6ea37 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -20,10 +20,12 @@ package org.apache.pinot.core.plan.maker;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
 import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
@@ -36,6 +38,7 @@ import 
org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.plan.SelectionPlanNode;
+import org.apache.pinot.core.plan.StreamingSelectionPlanNode;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
@@ -103,7 +106,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
       planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
     }
     CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit);
+        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, null);
     return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
   }
 
@@ -139,6 +142,28 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     }
   }
 
+  @Override
+  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, 
QueryContext queryContext,
+      ExecutorService executorService, StreamObserver<Server.ServerResponse> 
streamObserver, long endTimeMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, executorService, 
endTimeMs, _numGroupsLimit, streamObserver);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, 
QueryContext queryContext) {
+    if (QueryContextUtils.isAggregationQuery(queryContext)) {
+      throw new UnsupportedOperationException("Queries with aggregations are 
not supported");
+    } else {
+      // Selection query
+      return new StreamingSelectionPlanNode(indexSegment, queryContext);
+    }
+  }
+
   /**
    * Returns {@code true} if the given aggregation-only without filter 
QueryContext can be solved with segment metadata,
    * {@code false} otherwise.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index b4a316d..9d5411a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.core.plan.maker;
 
+import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.PlanNode;
@@ -43,4 +45,17 @@ public interface PlanMaker {
    * Returns a segment level {@link PlanNode} which contains the logical 
execution plan for one segment.
    */
   PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext 
queryContext);
+
+  /**
+   * Returns an instance level {@link Plan} for a streaming query which 
contains the logical execution plan for multiple
+   * segments.
+   */
+  Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, 
QueryContext queryContext,
+      ExecutorService executorService, StreamObserver<Server.ServerResponse> 
streamObserver, long endTimeMs);
+
+  /**
+   * Returns a segment level {@link PlanNode} for a streaming query which 
contains the logical execution plan for one
+   * segment.
+   */
+  PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, 
QueryContext queryContext);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
new file mode 100644
index 0000000..e64fd34
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.executor;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.proto.PinotQueryServerGrpc;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Query executor for gRPC server requests.
+ * <ul>
+ *   <li>
+ *     For streaming request, multiple (could be 0 if no data should be 
returned, or query encounters exception) data
+ *     responses will be returned, followed by one single metadata response.
+ *   </li>
+ *   <li>
+ *     For non-streaming request, one single response containing both data and 
metadata will be returned.
+ *   </li>
+ * </ul>
+ * TODO: Plug in QueryScheduler
+ */
+public class GrpcQueryExecutor extends 
PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcQueryExecutor.class);
+
+  private final InstanceDataManager _instanceDataManager;
+  private final ServerMetrics _serverMetrics;
+  private final long _defaultTimeOutMs;
+  private final SegmentPrunerService _segmentPrunerService;
+  private final PlanMaker _planMaker;
+  private final ExecutorService _executorService =
+      
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+
+  public GrpcQueryExecutor(PinotConfiguration config, InstanceDataManager 
instanceDataManager,
+      ServerMetrics serverMetrics)
+      throws ConfigurationException {
+    _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
+    QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
+    long defaultTimeoutMs = queryExecutorConfig.getTimeOut();
+    _defaultTimeOutMs =
+        defaultTimeoutMs > 0 ? defaultTimeoutMs : 
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+    _segmentPrunerService = new 
SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
+    _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
+    LOGGER.info("Initialized PinotQueryHandler with default timeout: {}ms, 
numWorkerThreads: {}", _defaultTimeOutMs,
+        ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+  }
+
+  @Override
+  public void submit(Server.ServerRequest request, 
StreamObserver<Server.ServerResponse> responseObserver) {
+    // Deserialize the request
+    ServerQueryRequest queryRequest;
+    try {
+      queryRequest = new ServerQueryRequest(request, _serverMetrics);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while deserializing the request: {}", 
request, e);
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS,
 1);
+      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad 
request").withCause(e).asException());
+      return;
+    }
+
+    // Process the query
+    try {
+      processQuery(queryRequest, responseObserver);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing request {}: {} from 
broker: {}", queryRequest.getRequestId(),
+          queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+      responseObserver.onError(Status.INTERNAL.withCause(e).asException());
+    }
+  }
+
+  private void processQuery(ServerQueryRequest queryRequest, 
StreamObserver<Server.ServerResponse> streamObserver) {
+    TimerContext timerContext = queryRequest.getTimerContext();
+    TimerContext.Timer schedulerWaitTimer = 
timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
+    if (schedulerWaitTimer != null) {
+      schedulerWaitTimer.stopAndRecord();
+    }
+    long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
+    long querySchedulingTimeMs = System.currentTimeMillis() - 
queryArrivalTimeMs;
+    TimerContext.Timer queryProcessingTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);
+
+    long requestId = queryRequest.getRequestId();
+    String tableNameWithType = queryRequest.getTableNameWithType();
+    QueryContext queryContext = queryRequest.getQueryContext();
+    LOGGER.debug("Incoming request Id: {}, query: {}", requestId, 
queryContext);
+    // Use the timeout passed from the request if exists, or the 
instance-level timeout
+    long queryTimeoutMs = _defaultTimeOutMs;
+    Map<String, String> queryOptions = queryContext.getQueryOptions();
+    if (queryOptions != null) {
+      Long timeoutFromQueryOptions = QueryOptions.getTimeoutMs(queryOptions);
+      if (timeoutFromQueryOptions != null) {
+        queryTimeoutMs = timeoutFromQueryOptions;
+      }
+    }
+
+    // Query scheduler wait time already exceeds query timeout, directly return
+    if (querySchedulingTimeMs >= queryTimeoutMs) {
+      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
+      String errorMessage = String
+          .format("Query scheduling took %dms (longer than query timeout of 
%dms)", querySchedulingTimeMs,
+              queryTimeoutMs);
+      DataTable dataTable = new DataTableImplV2();
+      
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
 errorMessage));
+      LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
+      sendResponse(queryRequest, streamObserver, dataTable);
+      return;
+    }
+
+    TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
+    if (tableDataManager == null) {
+      String errorMessage = "Failed to find table: " + tableNameWithType;
+      DataTable dataTable = new DataTableImplV2();
+      
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
 errorMessage));
+      LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
+      sendResponse(queryRequest, streamObserver, dataTable);
+      return;
+    }
+
+    List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+    List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireSegments(segmentsToQuery);
+
+    // When segment is removed from the IdealState:
+    // 1. Controller schedules a state transition to server to turn segment 
OFFLINE
+    // 2. Server gets the state transition, removes the segment data manager 
and update its CurrentState
+    // 3. Controller gathers the CurrentState and update the ExternalView
+    // 4. Broker watches ExternalView change and updates the routing table to 
stop querying the segment
+    //
+    // After step 2 but before step 4, segment will be missing on server side
+    // TODO: Change broker to watch both IdealState and ExternalView to not 
query the removed segments
+    int numSegmentsQueried = segmentsToQuery.size();
+    int numSegmentsAcquired = segmentDataManagers.size();
+    if (numSegmentsQueried > numSegmentsAcquired) {
+      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS,
+          numSegmentsQueried - numSegmentsAcquired);
+    }
+
+    boolean enableTrace = queryRequest.isEnableTrace();
+    if (enableTrace) {
+      TraceContext.register(requestId);
+    }
+
+    int numConsumingSegmentsProcessed = 0;
+    long minIndexTimeMs = Long.MAX_VALUE;
+    long minIngestionTimeMs = Long.MAX_VALUE;
+    // gather stats for realtime consuming segments
+    for (SegmentDataManager segmentMgr : segmentDataManagers) {
+      if (segmentMgr.getSegment() instanceof MutableSegment) {
+        numConsumingSegmentsProcessed += 1;
+        SegmentMetadata metadata = 
segmentMgr.getSegment().getSegmentMetadata();
+        long indexedTime = metadata.getLastIndexedTimestamp();
+        if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
+          minIndexTimeMs = metadata.getLastIndexedTimestamp();
+        }
+        long ingestionTime = metadata.getLatestIngestionTimestamp();
+        if (ingestionTime != Long.MIN_VALUE && ingestionTime < 
minIngestionTimeMs) {
+          minIngestionTimeMs = ingestionTime;
+        }
+      }
+    }
+
+    long minConsumingFreshnessTimeMs = minIngestionTimeMs;
+    if (numConsumingSegmentsProcessed > 0) {
+      if (minIngestionTimeMs == Long.MAX_VALUE) {
+        LOGGER.debug("Did not find valid ingestionTimestamp across consuming 
segments! Using indexTime instead");
+        minConsumingFreshnessTimeMs = minIndexTimeMs;
+      }
+      LOGGER
+          .debug("Querying: {} consuming segments with 
minConsumingFreshnessTimeMs: {}", numConsumingSegmentsProcessed,
+              minConsumingFreshnessTimeMs);
+    }
+
+    DataTable dataTable = null;
+    try {
+      // Compute total docs for the table before pruning the segments
+      long numTotalDocs = 0;
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        numTotalDocs += 
segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
+      }
+      TimerContext.Timer segmentPruneTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+      segmentDataManagers = _segmentPrunerService.prune(tableDataManager, 
segmentDataManagers, queryRequest);
+      segmentPruneTimer.stopAndRecord();
+      int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
+      LOGGER.debug("Matched {} segments after pruning", 
numSegmentsMatchedAfterPruning);
+      if (numSegmentsMatchedAfterPruning == 0) {
+        dataTable =
+            queryRequest.isEnableStreaming() ? new DataTableImplV2() : 
DataTableUtils.buildEmptyDataTable(queryContext);
+        Map<String, String> metadata = dataTable.getMetadata();
+        metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, 
String.valueOf(numTotalDocs));
+        metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+        metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, 
"0");
+        metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, 
"0");
+        metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+        metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+      } else {
+        TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+        List<IndexSegment> indexSegments = new 
ArrayList<>(numSegmentsMatchedAfterPruning);
+        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+          indexSegments.add(segmentDataManager.getSegment());
+        }
+        long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
+        Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
+            .makeStreamingInstancePlan(indexSegments, queryContext, 
_executorService, streamObserver, endTimeMs)
+            : _planMaker.makeInstancePlan(indexSegments, queryContext, 
_executorService, endTimeMs);
+        planBuildTimer.stopAndRecord();
+
+        TimerContext.Timer planExecTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+        dataTable = globalQueryPlan.execute();
+        planExecTimer.stopAndRecord();
+
+        // Update the total docs in the metadata based on un-pruned segments.
+        dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, 
Long.toString(numTotalDocs));
+      }
+    } catch (Exception e) {
+      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
+
+      // Do not log error for BadQueryRequestException because it's caused by 
bad query
+      if (e instanceof BadQueryRequestException) {
+        LOGGER.info("Caught BadQueryRequestException while processing 
requestId: {}, {}", requestId, e.getMessage());
+      } else {
+        LOGGER.error("Exception processing requestId {}", requestId, e);
+      }
+
+      dataTable = new DataTableImplV2();
+      
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        tableDataManager.releaseSegment(segmentDataManager);
+      }
+      if (enableTrace) {
+        if (dataTable != null) {
+          dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY, 
TraceContext.getTraceInfo());
+        }
+        TraceContext.unregister();
+      }
+    }
+
+    queryProcessingTimer.stopAndRecord();
+    long queryProcessingTime = queryProcessingTimer.getDurationMs();
+    dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, 
Integer.toString(numSegmentsQueried));
+    dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, 
Long.toString(queryProcessingTime));
+
+    if (numConsumingSegmentsProcessed > 0) {
+      dataTable.getMetadata()
+          .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, 
Integer.toString(numConsumingSegmentsProcessed));
+      dataTable.getMetadata()
+          .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, 
Long.toString(minConsumingFreshnessTimeMs));
+    }
+
+    LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
+    LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, 
dataTable);
+
+    // TODO: Log query stats
+
+    sendResponse(queryRequest, streamObserver, dataTable);
+  }
+
+  private void sendResponse(ServerQueryRequest queryRequest, 
StreamObserver<Server.ServerResponse> streamObserver,
+      DataTable dataTable) {
+    Server.ServerResponse response;
+    try {
+      response = queryRequest.isEnableStreaming() ? 
StreamingResponseUtils.getMetadataResponse(dataTable)
+          : StreamingResponseUtils.getNonStreamingResponse(dataTable);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while constructing response from data 
table for request {}: {} from broker: {}",
+          queryRequest.getRequestId(), queryRequest.getQueryContext(), 
queryRequest.getBrokerId(), e);
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
 1);
+      streamObserver.onError(Status.INTERNAL.withCause(e).asException());
+      return;
+    }
+    streamObserver.onNext(response);
+    streamObserver.onCompleted();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 1763335..24f2fa2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -19,14 +19,20 @@
 package org.apache.pinot.core.query.request;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.CommonConstants.Query.Request;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TCompactProtocol;
 
 
 /**
@@ -36,11 +42,14 @@ import 
org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
  * per segment basis.
  */
 public class ServerQueryRequest {
+  private static final CalciteSqlCompiler SQL_COMPILER = new 
CalciteSqlCompiler();
+
   private final long _requestId;
+  private final String _brokerId;
+  private final boolean _enableTrace;
+  private final boolean _enableStreaming;
   private final String _tableNameWithType;
   private final List<String> _segmentsToQuery;
-  private final boolean _enableTrace;
-  private final String _brokerId;
 
   // Timing information for different phases of query execution
   private final TimerContext _timerContext;
@@ -51,11 +60,12 @@ public class ServerQueryRequest {
 
   public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics 
serverMetrics, long queryArrivalTimeMs) {
     _requestId = instanceRequest.getRequestId();
+    _brokerId = instanceRequest.getBrokerId() != null ? 
instanceRequest.getBrokerId() : "unknown";
+    _enableTrace = instanceRequest.isEnableTrace();
+    _enableStreaming = false;
     BrokerRequest brokerRequest = instanceRequest.getQuery();
     _tableNameWithType = brokerRequest.getQuerySource().getTableName();
     _segmentsToQuery = instanceRequest.getSearchSegments();
-    _enableTrace = instanceRequest.isEnableTrace();
-    _brokerId = instanceRequest.getBrokerId() != null ? 
instanceRequest.getBrokerId() : "unknown";
     _timerContext = new TimerContext(_tableNameWithType, serverMetrics, 
queryArrivalTimeMs);
 
     // Pre-compute segment independent information
@@ -63,24 +73,59 @@ public class ServerQueryRequest {
     _allColumns = QueryContextUtils.getAllColumns(_queryContext);
   }
 
-  public long getRequestId() {
-    return _requestId;
+  public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics 
serverMetrics)
+      throws Exception {
+    long queryArrivalTimeMs = System.currentTimeMillis();
+
+    Map<String, String> metadata = serverRequest.getMetadataMap();
+    _requestId = 
Long.parseLong(metadata.getOrDefault(Request.MetadataKeys.REQUEST_ID, "0"));
+    _brokerId = metadata.getOrDefault(Request.MetadataKeys.BROKER_ID, 
"unknown");
+    _enableTrace = 
Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_TRACE));
+    _enableStreaming = 
Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING));
+
+    BrokerRequest brokerRequest;
+    String payloadType = 
metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, 
Request.PayloadType.SQL);
+    if (payloadType.equalsIgnoreCase(Request.PayloadType.SQL)) {
+      brokerRequest = 
SQL_COMPILER.compileToBrokerRequest(serverRequest.getSql());
+    } else if 
(payloadType.equalsIgnoreCase(Request.PayloadType.BROKER_REQUEST)) {
+      brokerRequest = new BrokerRequest();
+      new TDeserializer(new TCompactProtocol.Factory())
+          .deserialize(brokerRequest, 
serverRequest.getPayload().toByteArray());
+    } else {
+      throw new UnsupportedOperationException("Unsupported payloadType: " + 
payloadType);
+    }
+
+    _tableNameWithType = brokerRequest.getQuerySource().getTableName();
+    _segmentsToQuery = serverRequest.getSegmentsList();
+    _timerContext = new TimerContext(_tableNameWithType, serverMetrics, 
queryArrivalTimeMs);
+
+    // Pre-compute segment independent information
+    _queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    _allColumns = QueryContextUtils.getAllColumns(_queryContext);
   }
 
-  public String getTableNameWithType() {
-    return _tableNameWithType;
+  public long getRequestId() {
+    return _requestId;
   }
 
-  public List<String> getSegmentsToQuery() {
-    return _segmentsToQuery;
+  public String getBrokerId() {
+    return _brokerId;
   }
 
   public boolean isEnableTrace() {
     return _enableTrace;
   }
 
-  public String getBrokerId() {
-    return _brokerId;
+  public boolean isEnableStreaming() {
+    return _enableStreaming;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  public List<String> getSegmentsToQuery() {
+    return _segmentsToQuery;
   }
 
   public TimerContext getTimerContext() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
similarity index 54%
rename from 
pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index e0fef72..7413070 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -16,28 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.conf;
+package org.apache.pinot.core.transport.grpc;
 
-import java.util.Optional;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import java.io.IOException;
+import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.spi.env.PinotConfiguration;
 
+public class GrpcQueryServer {
+  private final Server _server;
 
-public class NettyServerConfig {
-  private static String NETTY_SERVER_PORT = "port";
-
-  private final int port;
+  public GrpcQueryServer(int port, GrpcQueryExecutor queryExecutor) {
+    _server = ServerBuilder.forPort(port).addService(queryExecutor).build();
+  }
 
-  public NettyServerConfig(PinotConfiguration serverNettyConfig) throws 
ConfigurationException {
-    this.port = 
Optional.ofNullable(serverNettyConfig.getProperty(NETTY_SERVER_PORT, 
Integer.class))
-        .orElseThrow(() -> new ConfigurationException("Cannot find Key : " + 
NETTY_SERVER_PORT));
+  public void start() {
+    try {
+      _server.start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
-  /**
-   * @return Netty server port
-   */
-  public int getPort() {
-    return port;
+  public void shutdown() {
+    try {
+      _server.shutdown().awaitTermination();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index 5ee3da2..e45b441 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -230,7 +230,7 @@ public class SelectionCombineOperatorTest {
     }
     CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
queryContext, EXECUTOR,
         System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
     return combinePlanNode.run().nextBlock();
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
index b0d5d13..11053c7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java
@@ -58,7 +58,7 @@ public class CombinePlanNodeTest {
       }
       CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
_queryContext, _executorService,
           System.currentTimeMillis() + 
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+          InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
       combinePlanNode.run();
       Assert.assertEquals(numPlans, count.get());
     }
@@ -83,7 +83,7 @@ public class CombinePlanNodeTest {
     }
     CombinePlanNode combinePlanNode =
         new CombinePlanNode(planNodes, _queryContext, _executorService, 
System.currentTimeMillis() + 100,
-            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+            InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
     try {
       combinePlanNode.run();
     } catch (RuntimeException e) {
@@ -105,7 +105,7 @@ public class CombinePlanNodeTest {
     }
     CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, 
_queryContext, _executorService,
         System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
-        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
+        InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null);
     try {
       combinePlanNode.run();
     } catch (RuntimeException e) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index c66a6f2..c1a3a85 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.integration.tests;
 
+import org.testng.annotations.Test;
+
+
 /**
  * Integration test that extends OfflineClusterIntegrationTest but start 
multiple brokers and servers.
  */
@@ -34,4 +37,15 @@ public class MultiNodesOfflineClusterIntegrationTest extends 
OfflineClusterInteg
   protected int getNumServers() {
     return NUM_SERVERS;
   }
+
+  @Override
+  protected void startServers() {
+    startServers(NUM_SERVERS);
+  }
+
+  @Test(enabled = false)
+  @Override
+  public void testGrpcQueryServer() {
+    // Ignored
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7475715..790968a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -30,21 +30,31 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -130,7 +140,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     startZk();
     startController();
     startBrokers(getNumBrokers());
-    startServers(getNumServers());
+    startServers();
 
     // Create and upload the schema and table config
     Schema schema = createSchema();
@@ -160,6 +170,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     waitForAllDocsLoaded(600_000L);
   }
 
+  protected void startServers() {
+    // Enable gRPC server
+    PinotConfiguration serverConfig = getDefaultServerConfiguration();
+    
serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, 
true);
+    startServer(serverConfig);
+  }
+
   private void registerCallbackHandlers() {
     List<String> instances = 
_helixAdmin.getInstancesInCluster(getHelixClusterName());
     instances.removeIf(instance -> 
(!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && 
!instance
@@ -973,15 +990,18 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testQueryWithSameAlias()
       throws Exception {
     //test repeated columns in selection query
-    String query = "SELECT ArrTime AS ArrTime, Carrier AS Carrier, 
DaysSinceEpoch AS DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
+    String query =
+        "SELECT ArrTime AS ArrTime, Carrier AS Carrier, DaysSinceEpoch AS 
DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
     testQuery(query, Collections.singletonList(query));
 
     //test repeated columns in selection query
-    query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, 
Carrier AS Carrier FROM mytable ORDER BY Carrier DESC";
+    query =
+        "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier 
AS Carrier FROM mytable ORDER BY Carrier DESC";
     testQuery(query, Collections.singletonList(query));
 
     //test repeated columns in selection query
-    query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, 
Carrier AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
+    query =
+        "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier 
AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC";
     testQuery(query, Collections.singletonList(query));
   }
 
@@ -1124,11 +1144,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     sql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, 
DestAirportID";
     testSqlQuery(pql, Collections.singletonList(sql));
 
-    pql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY 
Carrier, DestAirportID, DestStateName LIMIT 1000000";
+    pql =
+        "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY 
Carrier, DestAirportID, DestStateName LIMIT 1000000";
     sql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY 
Carrier, DestAirportID, DestStateName";
     testSqlQuery(pql, Collections.singletonList(sql));
 
-    pql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY 
Carrier, DestAirportID, DestCityName LIMIT 1000000";
+    pql =
+        "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY 
Carrier, DestAirportID, DestCityName LIMIT 1000000";
     sql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY 
Carrier, DestAirportID, DestCityName";
     testSqlQuery(pql, Collections.singletonList(sql));
 
@@ -1160,7 +1182,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
     baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
-    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+    baseQueries
+        .forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
 
     for (String query : queries) {
       try {
@@ -1213,8 +1236,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM 
mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY 
dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
-    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
-    baseQueries.forEach(q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+    baseQueries
+        .forEach(q -> queries.add(q.replace("mytable", 
"MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+    baseQueries.forEach(
+        q -> queries.add(q.replace("mytable", 
"MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
 
     for (String query : queries) {
       try {
@@ -1279,6 +1304,61 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     query = "SELECT c_o_u_n_t(FlightNum) FROM mytable ";
     
assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(),
 115545);
     assertEquals(postSqlQuery(query, 
_brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), 
115545);
+  }
+
+  @Test
+  public void testGrpcQueryServer()
+      throws Exception {
+    GrpcQueryClient queryClient = new GrpcQueryClient("localhost", 
CommonConstants.Server.DEFAULT_GRPC_PORT);
+    String sql = "SELECT * FROM mytable_OFFLINE LIMIT 1000000";
+    BrokerRequest brokerRequest = new 
Pql2Compiler().compileToBrokerRequest(sql);
+    List<String> segments = 
_helixResourceManager.getSegmentsFor("mytable_OFFLINE");
+
+    GrpcRequestBuilder requestBuilder = new 
GrpcRequestBuilder().setSegments(segments);
+    
testNonStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build()));
+    
testNonStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build()));
+
+    requestBuilder.setEnableStreaming(true);
+    
testStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build()));
+    
testStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build()));
+  }
+
+  private void testNonStreamingRequest(Iterator<Server.ServerResponse> 
nonStreamingResponses)
+      throws Exception {
+    int expectedNumDocs = (int) getCountStarResult();
+    assertTrue(nonStreamingResponses.hasNext());
+    Server.ServerResponse nonStreamingResponse = nonStreamingResponses.next();
+    
assertEquals(nonStreamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE),
+        CommonConstants.Query.Response.ResponseType.NON_STREAMING);
+    DataTable dataTable = 
DataTableFactory.getDataTable(nonStreamingResponse.getPayload().asReadOnlyByteBuffer());
+    assertNotNull(dataTable.getDataSchema());
+    assertEquals(dataTable.getNumberOfRows(), expectedNumDocs);
+    Map<String, String> metadata = dataTable.getMetadata();
+    assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), 
Integer.toString(expectedNumDocs));
+  }
 
+  private void testStreamingRequest(Iterator<Server.ServerResponse> 
streamingResponses)
+      throws Exception {
+    int expectedNumDocs = (int) getCountStarResult();
+    int numTotalDocs = 0;
+    while (streamingResponses.hasNext()) {
+      Server.ServerResponse streamingResponse = streamingResponses.next();
+      DataTable dataTable = 
DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer());
+      String responseType =
+          
streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE);
+      if 
(responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
+        assertTrue(dataTable.getMetadata().isEmpty());
+        assertNotNull(dataTable.getDataSchema());
+        numTotalDocs += dataTable.getNumberOfRows();
+      } else {
+        assertEquals(responseType, 
CommonConstants.Query.Response.ResponseType.METADATA);
+        assertFalse(streamingResponses.hasNext());
+        assertEquals(numTotalDocs, expectedNumDocs);
+        assertNull(dataTable.getDataSchema());
+        assertEquals(dataTable.getNumberOfRows(), 0);
+        Map<String, String> metadata = dataTable.getMetadata();
+        assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), 
Integer.toString(expectedNumDocs));
+      }
+    }
   }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java 
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index e7c9a97..1b5087f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -20,9 +20,8 @@ package org.apache.pinot.server.conf;
 
 import java.util.Arrays;
 import java.util.List;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -30,6 +29,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  * The config used for Server.
  */
 public class ServerConf {
+  // TODO: Replace with constants in CommonConstants
   private static final String PINOT_ = "pinot.";
   private static final String PINOT_SERVER_INSTANCE = "pinot.server.instance";
   private static final String PINOT_SERVER_METRICS = "pinot.server.metrics";
@@ -37,7 +37,6 @@ public class ServerConf {
   private static final String PINOT_SERVER_TABLE_LEVEL_METRICS = 
"pinot.server.enableTableLevelMetrics";
   private static final String PINOT_SERVER_QUERY = 
"pinot.server.query.executor";
   private static final String PINOT_SERVER_REQUEST = "pinot.server.request";
-  private static final String PINOT_SERVER_NETTY = "pinot.server.netty";
   private static final String PINOT_SERVER_INSTANCE_DATA_MANAGER_CLASS = 
"pinot.server.instance.data.manager.class";
   private static final String PINOT_SERVER_QUERY_EXECUTOR_CLASS = 
"pinot.server.query.executor.class";
   private static final String PINOT_SERVER_TRANSFORM_FUNCTIONS = 
"pinot.server.transforms";
@@ -70,9 +69,16 @@ public class ServerConf {
     return _serverConf.subset(PINOT_SERVER_METRICS);
   }
 
-  public NettyServerConfig getNettyConfig()
-      throws ConfigurationException {
-    return new NettyServerConfig(_serverConf.subset(PINOT_SERVER_NETTY));
+  public int getNettyPort() {
+    return _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT, 
Helix.DEFAULT_SERVER_NETTY_PORT);
+  }
+
+  public boolean isEnableGrpcServer() {
+    return _serverConf.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, 
Server.DEFAULT_ENABLE_GRPC_SERVER);
+  }
+
+  public int getGrpcPort() {
+    return _serverConf.getProperty(Server.CONFIG_OF_GRPC_PORT, 
Server.DEFAULT_GRPC_PORT);
   }
 
   public PinotConfiguration getConfig(String component) {
@@ -104,6 +110,6 @@ public class ServerConf {
   }
 
   public String getMetricsPrefix() {
-    return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX, 
CommonConstants.Server.DEFAULT_METRICS_PREFIX);
+    return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX, 
Server.DEFAULT_METRICS_PREFIX);
   }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index ae448ad..fa1903c 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -30,11 +30,14 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import 
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
+import org.apache.pinot.core.query.executor.GrpcQueryExecutor;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
 import org.apache.pinot.core.transport.QueryServer;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
 import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +54,8 @@ public class ServerInstance {
   private final QueryExecutor _queryExecutor;
   private final LongAccumulator _latestQueryTime;
   private final QueryScheduler _queryScheduler;
-  private final QueryServer _queryServer;
+  private final QueryServer _nettyQueryServer;
+  private final GrpcQueryServer _grpcQueryServer;
 
   private boolean _started = false;
 
@@ -77,16 +81,27 @@ public class ServerInstance {
     String queryExecutorClassName = serverConf.getQueryExecutorClassName();
     LOGGER.info("Initializing query executor of class: {}", 
queryExecutorClassName);
     _queryExecutor = (QueryExecutor) 
Class.forName(queryExecutorClassName).newInstance();
-    _queryExecutor.init(serverConf.getQueryExecutorConfig(), 
_instanceDataManager, _serverMetrics);
+    PinotConfiguration queryExecutorConfig = 
serverConf.getQueryExecutorConfig();
+    _queryExecutor.init(queryExecutorConfig, _instanceDataManager, 
_serverMetrics);
 
     LOGGER.info("Initializing query scheduler");
     _latestQueryTime = new LongAccumulator(Long::max, 0);
     _queryScheduler =
         QuerySchedulerFactory.create(serverConf.getSchedulerConfig(), 
_queryExecutor, _serverMetrics, _latestQueryTime);
 
-    int queryServerPort = serverConf.getNettyConfig().getPort();
-    LOGGER.info("Initializing query server on port: {}", queryServerPort);
-    _queryServer = new QueryServer(queryServerPort, _queryScheduler, 
_serverMetrics);
+    int nettyPort = serverConf.getNettyPort();
+    LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
+    _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, 
_serverMetrics);
+
+    if (serverConf.isEnableGrpcServer()) {
+      int grpcPort = serverConf.getGrpcPort();
+      LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
+      GrpcQueryExecutor grpcQueryExecutor =
+          new GrpcQueryExecutor(queryExecutorConfig, _instanceDataManager, 
_serverMetrics);
+      _grpcQueryServer = new GrpcQueryServer(grpcPort, grpcQueryExecutor);
+    } else {
+      _grpcQueryServer = null;
+    }
 
     LOGGER.info("Initializing transform functions");
     Set<Class<TransformFunction>> transformFunctionClasses = new HashSet<>();
@@ -119,8 +134,12 @@ public class ServerInstance {
     _queryExecutor.start();
     LOGGER.info("Starting query scheduler");
     _queryScheduler.start();
-    LOGGER.info("Starting query server");
-    _queryServer.start();
+    LOGGER.info("Starting Netty query server");
+    _nettyQueryServer.start();
+    if (_grpcQueryServer != null) {
+      LOGGER.info("Starting gRPC query server");
+      _grpcQueryServer.start();
+    }
 
     _started = true;
     LOGGER.info("Finish starting server instance");
@@ -130,8 +149,12 @@ public class ServerInstance {
     Preconditions.checkState(_started, "Server instance is not running");
     LOGGER.info("Shutting down server instance");
 
-    LOGGER.info("Shutting down query server");
-    _queryServer.shutDown();
+    if (_grpcQueryServer != null) {
+      LOGGER.info("Shutting down gRPC query server");
+      _grpcQueryServer.shutdown();
+    }
+    LOGGER.info("Shutting down Netty query server");
+    _nettyQueryServer.shutDown();
     LOGGER.info("Shutting down query scheduler");
     _queryScheduler.stop();
     LOGGER.info("Shutting down query executor");
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
deleted file mode 100644
index bee45de..0000000
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.server.starter.grpc;
-
-public class PinotQueryService {
-
-}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index abe2275..b419d83 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -18,54 +18,17 @@
  */
 package org.apache.pinot.server.starter.helix;
 
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.ACCESS_CONTROL_FACTORY_CLASS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -85,9 +48,10 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.Instance;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel;
+import org.apache.pinot.common.utils.CommonConstants.Server;
 import 
org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -110,9 +74,6 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
 
 /**
  * Starter for Pinot server.
@@ -142,6 +103,7 @@ public class HelixServerStarter implements ServiceStartable 
{
   private final String _host;
   private final int _port;
   private final String _instanceId;
+  private final HelixConfigScope _instanceConfigScope;
   private HelixManager _helixManager;
   private HelixAdmin _helixAdmin;
   private ServerInstance _serverInstance;
@@ -155,23 +117,21 @@ public class HelixServerStarter implements 
ServiceStartable {
     // Make a clone so that changes to the config won't propagate to the caller
     _serverConf = serverConf.clone();
 
-    _host = _serverConf.getProperty(KEY_OF_SERVER_NETTY_HOST,
-        
_serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, 
false) ? NetUtil
-            .getHostnameOrAddress() : NetUtil.getHostAddress());
-    _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT, 
DEFAULT_SERVER_NETTY_PORT);
+    _host = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
+        _serverConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) 
? NetUtil.getHostnameOrAddress()
+            : NetUtil.getHostAddress());
+    _port = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT, 
Helix.DEFAULT_SERVER_NETTY_PORT);
 
-    _instanceId = 
Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID))
-
-        // InstanceId is not configured. Fallback to an auto generated config.
-        .orElseGet(this::initializeDefaultInstanceId);
-  }
-
-  private String initializeDefaultInstanceId() {
-    String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
-
-    _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, instanceId);
+    String instanceId = _serverConf.getProperty(Server.CONFIG_OF_INSTANCE_ID);
+    if (instanceId == null) {
+      instanceId = Helix.PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port;
+      _serverConf.addProperty(Server.CONFIG_OF_INSTANCE_ID, instanceId);
+    }
+    _instanceId = instanceId;
 
-    return instanceId;
+    _instanceConfigScope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, 
_helixClusterName).forParticipant(_instanceId)
+            .build();
   }
 
   /**
@@ -179,9 +139,11 @@ public class HelixServerStarter implements 
ServiceStartable {
    */
   private void registerServiceStatusHandler() {
     double minResourcePercentForStartup = _serverConf
-        .getProperty(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, 
DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
-    int realtimeConsumptionCatchupWaitMs = 
_serverConf.getProperty(CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
-        DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
+        .getProperty(Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START,
+            Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START);
+    int realtimeConsumptionCatchupWaitMs = _serverConf
+        
.getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS,
+            Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS);
 
     // collect all resources which have this instance in the ideal state
     List<String> resourcesToMonitor = new ArrayList<>();
@@ -235,25 +197,6 @@ public class HelixServerStarter implements 
ServiceStartable {
         new 
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
   }
 
-  private void setAdminApiPort(int adminApiPort) {
-    Map<String, String> propToUpdate = new HashMap<>();
-    propToUpdate.put(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort));
-    updateInstanceConfigInHelix(propToUpdate);
-  }
-
-  private void setShuttingDownStatus(boolean shuttingDownStatus) {
-    Map<String, String> propToUpdate = new HashMap<>();
-    propToUpdate.put(IS_SHUTDOWN_IN_PROGRESS, 
String.valueOf(shuttingDownStatus));
-    updateInstanceConfigInHelix(propToUpdate);
-  }
-
-  private void updateInstanceConfigInHelix(Map<String, String> props) {
-    HelixConfigScope scope =
-        new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, 
_helixClusterName).forParticipant(_instanceId)
-            .build();
-    _helixAdmin.setConfig(scope, props);
-  }
-
   private void updateInstanceConfigIfNeeded(String host, int port) {
     InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(_helixClusterName, _instanceId);
     boolean needToUpdateInstanceConfig = false;
@@ -265,7 +208,7 @@ public class HelixServerStarter implements ServiceStartable 
{
         instanceConfig.addTag(TagNameUtils.getOfflineTagForTenant(null));
         instanceConfig.addTag(TagNameUtils.getRealtimeTagForTenant(null));
       } else {
-        instanceConfig.addTag(UNTAGGED_SERVER_INSTANCE);
+        instanceConfig.addTag(Helix.UNTAGGED_SERVER_INSTANCE);
       }
       needToUpdateInstanceConfig = true;
     }
@@ -303,7 +246,7 @@ public class HelixServerStarter implements ServiceStartable 
{
     // from ZooKeeper). Setting flapping time window to a small value can 
avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
     System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
-        _serverConf.getProperty(CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS, 
DEFAULT_FLAPPING_TIME_WINDOW_MS));
+        
_serverConf.getProperty(Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS, 
Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
 
   /**
@@ -314,8 +257,8 @@ public class HelixServerStarter implements ServiceStartable 
{
   private void startupServiceStatusCheck(long endTimeMs) {
     LOGGER.info("Starting startup service status check");
     long startTimeMs = System.currentTimeMillis();
-    long checkIntervalMs = _serverConf
-        .getProperty(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, 
DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
+    long checkIntervalMs = 
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS,
+        Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS);
 
     while (System.currentTimeMillis() < endTimeMs) {
       Status serviceStatus = ServiceStatus.getServiceStatus();
@@ -370,7 +313,8 @@ public class HelixServerStarter implements ServiceStartable 
{
     _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
-    SegmentFetcherAndLoader fetcherAndLoader = new 
SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics);
+    SegmentFetcherAndLoader fetcherAndLoader =
+        new SegmentFetcherAndLoader(_serverConf, instanceDataManager, 
serverMetrics);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager, fetcherAndLoader);
     _helixManager.getStateMachineEngine()
@@ -386,7 +330,7 @@ public class HelixServerStarter implements ServiceStartable 
{
 
     // Start restlet server for admin API endpoint
     String accessControlFactoryClass =
-        _serverConf.getProperty(ACCESS_CONTROL_FACTORY_CLASS, 
DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
+        _serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS, 
Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
     LOGGER.info("Using class: {} as the AccessControlFactory", 
accessControlFactoryClass);
     final AccessControlFactory accessControlFactory;
     try {
@@ -397,10 +341,20 @@ public class HelixServerStarter implements 
ServiceStartable {
               + "'", e);
     }
 
-    int adminApiPort = _serverConf.getProperty(CONFIG_OF_ADMIN_API_PORT, 
DEFAULT_ADMIN_API_PORT);
+    // Update admin API port
+    int adminApiPort = 
_serverConf.getProperty(Server.CONFIG_OF_ADMIN_API_PORT, 
Server.DEFAULT_ADMIN_API_PORT);
     _adminApiApplication = new AdminApiApplication(_serverInstance, 
accessControlFactory);
     _adminApiApplication.start(adminApiPort);
-    setAdminApiPort(adminApiPort);
+    _helixAdmin.setConfig(_instanceConfigScope,
+        Collections.singletonMap(Instance.ADMIN_PORT_KEY, 
String.valueOf(adminApiPort)));
+
+    // Update gRPC port
+    if (serverInstanceConfig.isEnableGrpcServer()) {
+      _helixAdmin.setConfig(_instanceConfigScope,
+          Collections.singletonMap(Instance.GRPC_PORT_KEY, 
String.valueOf(serverInstanceConfig.getGrpcPort())));
+    } else {
+      _helixAdmin.removeConfig(_instanceConfigScope, 
Collections.singletonList(Instance.GRPC_PORT_KEY));
+    }
 
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
@@ -408,19 +362,21 @@ public class HelixServerStarter implements 
ServiceStartable {
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
 
-    serverMetrics.addCallbackGauge(INSTANCE_CONNECTED_METRIC_NAME, () -> 
_helixManager.isConnected() ? 1L : 0L);
+    serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> 
_helixManager.isConnected() ? 1L : 0L);
     _helixManager
         .addPreConnectCallback(() -> 
serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
 
     // Register the service status handler
     registerServiceStatusHandler();
 
-    if (_serverConf
-        .getProperty(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, 
DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
-      long endTimeMs = startTimeMs + 
_serverConf.getProperty(CONFIG_OF_STARTUP_TIMEOUT_MS, 
DEFAULT_STARTUP_TIMEOUT_MS);
+    if 
(_serverConf.getProperty(Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK,
+        Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
+      long endTimeMs =
+          startTimeMs + 
_serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS, 
Server.DEFAULT_STARTUP_TIMEOUT_MS);
       startupServiceStatusCheck(endTimeMs);
     }
-    setShuttingDownStatus(false);
+    _helixAdmin.setConfig(_instanceConfigScope,
+        Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, 
Boolean.toString(false)));
     LOGGER.info("Pinot server ready");
 
     // Create metrics for mmap stuff
@@ -446,15 +402,19 @@ public class HelixServerStarter implements 
ServiceStartable {
       LOGGER.warn("Caught exception closing PinotFS classes", e);
     }
     _adminApiApplication.stop();
-    setShuttingDownStatus(true);
+    _helixAdmin.setConfig(_instanceConfigScope,
+        Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, 
Boolean.toString(true)));
 
-    long endTimeMs = startTimeMs + 
_serverConf.getProperty(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, 
DEFAULT_SHUTDOWN_TIMEOUT_MS);
-    if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, 
DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
+    long endTimeMs =
+        startTimeMs + 
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, 
Server.DEFAULT_SHUTDOWN_TIMEOUT_MS);
+    if (_serverConf
+        .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, 
Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) {
       shutdownQueryCheck(endTimeMs);
     }
     _helixManager.disconnect();
     _serverInstance.shutDown();
-    if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, 
DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
+    if (_serverConf
+        .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, 
Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) {
       shutdownResourceCheck(endTimeMs);
     }
     _realtimeLuceneIndexRefreshState.stop();
@@ -472,8 +432,9 @@ public class HelixServerStarter implements ServiceStartable 
{
     LOGGER.info("Starting shutdown query check");
     long startTimeMs = System.currentTimeMillis();
 
-    long maxQueryTimeMs = 
_serverConf.getProperty(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, 
DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
-    long noQueryThresholdMs = 
_serverConf.getProperty(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, 
maxQueryTimeMs);
+    long maxQueryTimeMs =
+        _serverConf.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, 
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    long noQueryThresholdMs = 
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, 
maxQueryTimeMs);
 
     // Wait until no incoming queries
     boolean noIncomingQueries = false;
@@ -553,8 +514,8 @@ public class HelixServerStarter implements ServiceStartable 
{
         }
       }
 
-      long checkIntervalMs = _serverConf
-          .getProperty(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, 
DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
+      long checkIntervalMs = 
_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS,
+          Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS);
       while (System.currentTimeMillis() < endTimeMs) {
         Iterator<String> iterator = resourcesToMonitor.iterator();
         String currentResource = null;
@@ -639,9 +600,9 @@ public class HelixServerStarter implements ServiceStartable 
{
       throws Exception {
     Map<String, Object> properties = new HashMap<>();
     int port = 8003;
-    properties.put(KEY_OF_SERVER_NETTY_PORT, port);
-    properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port 
+ "/index");
-    properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" 
+ port + "/segmentTar");
+    properties.put(Helix.KEY_OF_SERVER_NETTY_PORT, port);
+    properties.put(Server.CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" 
+ port + "/index");
+    properties.put(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, 
"/tmp/PinotServer/test" + port + "/segmentTar");
 
     HelixServerStarter serverStarter =
         new HelixServerStarter("quickstart", "localhost:2191", new 
PinotConfiguration(properties));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to