Jackie-Jiang commented on a change in pull request #5717:
URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r457748048



##########
File path: pinot-core/pom.xml
##########
@@ -227,5 +227,25 @@
       <artifactId>lucene-analyzers-common</artifactId>
       <version>${lucene.version}</version>
     </dependency>
+    <dependency>

Review comment:
       You don't need to import them again here as they are already imported in 
`pinot-common`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final List<ExpressionContext> _expressions;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
+  private final int _numRowsToKeep;
+  private final List<Object[]> _rows;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  private int _numDocsScanned = 0;
+
+  public StreamingSelectionOnlyOperator(
+      IndexSegment indexSegment,
+      QueryContext queryContext,
+      List<ExpressionContext> expressions,
+      TransformOperator transformOperator,
+      StreamObserver<Server.ServerResponse> streamObserver) {

Review comment:
       Let's not pass the `streamObserver` to the segment level operator 
because multiple segments will be processed in parallel, but calls to 
`streamObserver.onNext()` need to be synchronized.
   We should create a `StreamingCombineOperator` (instance level operator) to 
keep fetching `IntermediateResultsBlock` from this operator and handle the 
calls to `streamObserver.onNext()`.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java
##########
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import 
org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.EmptySelectionOperator;
+import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
+import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
+import org.apache.pinot.core.operator.query.StreamingSelectionOnlyOperator;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The <code>SelectionPlanNode</code> class provides the execution plan for 
selection query on a single segment.
+ */
+public class StreamingSelectionPlanNode implements PlanNode {
+  private final IndexSegment _indexSegment;
+  private final QueryContext _queryContext;
+  private final List<ExpressionContext> _expressions;
+  private final TransformPlanNode _transformPlanNode;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext 
queryContext,

Review comment:
       Similarly here, don't pass the `streamObserver` to the segment-level 
plan node. Create a `StreamingCombinePlanNode`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment 
indexSegment, QueryContext quer
     }
   }
 
+  @Override
+  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, 
QueryContext queryContext,
+                               ExecutorService executorService,
+                               StreamObserver<Server.ServerResponse> 
streamObserver, long timeOutMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, 
streamObserver));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, executorService, 
timeOutMs, _numGroupsLimit);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, 
QueryContext queryContext,

Review comment:
       Don't pass `streamObserver` to the segment level `PlanNode`

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
##########
@@ -18,6 +18,57 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 public class PinotQueryService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotQueryService.class);
+
+  private final int port;
+  private final Server server;
+
+  // TODO: pass config, and parameters to initialize handler
+  public PinotQueryService(int port) {
+    this.port = port;
+    server = ServerBuilder.forPort(port)
+        .addService(new PinotQueryHandler())
+        .build();
+
+  }
+
+  /** Start serving requests. */
+  public void start() throws IOException {
+    server.start();
+    LOGGER.info("Server started. Listening on {}", port);
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {

Review comment:
       Not sure if this shutdown hook is necessary

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result 
stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends 
PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = 
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager,
+                                ServerMetrics serverMetrics)
+      throws ConfigurationException {
+    _instanceDataManager = instanceDataManager;
+    _serverMetrics = serverMetrics;
+    QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
+    if (queryExecutorConfig.getTimeOut() > 0) {
+      _defaultTimeOutMs = queryExecutorConfig.getTimeOut();
+    }
+    LOGGER.info("Default timeout for query executor : {}", _defaultTimeOutMs);
+    LOGGER.info("Trying to build SegmentPrunerService");
+    _segmentPrunerService = new 
SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
+    LOGGER.info("Trying to build QueryPlanMaker");
+    _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig);
+    LOGGER.info("Trying to build QueryExecutorTimer");
+  }
+
+  /**
+   * Submit a streaming request to the server
+   * @param request
+   * @param responseObserver
+   */
   @Override
   public void submit(Server.ServerRequest request, 
StreamObserver<Server.ServerResponse> responseObserver) {
+    // TODO: implement, follow up whether to use ServerQueryRequest
+  }
+
+  public DataTable processQuery(ServerQueryRequest queryRequest,

Review comment:
       This method should not return anything (we can directly implement 
`submit()`)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final List<ExpressionContext> _expressions;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
+  private final int _numRowsToKeep;
+  private final List<Object[]> _rows;
+  private final StreamObserver<Server.ServerResponse> _streamObserver;
+
+  private int _numDocsScanned = 0;
+
+  public StreamingSelectionOnlyOperator(
+      IndexSegment indexSegment,
+      QueryContext queryContext,
+      List<ExpressionContext> expressions,
+      TransformOperator transformOperator,
+      StreamObserver<Server.ServerResponse> streamObserver) {
+    _indexSegment = indexSegment;
+    _transformOperator = transformOperator;
+    _expressions = expressions;
+    _streamObserver = streamObserver;
+
+    int numExpressions = _expressions.size();
+    _blockValSets = new BlockValSet[numExpressions];
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      TransformResultMetadata expressionMetadata = 
_transformOperator.getResultMetadata(expression);
+      columnNames[i] = expression.toString();
+      columnDataTypes[i] =
+          
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), 
expressionMetadata.isSingleValue());
+    }
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    _numRowsToKeep = queryContext.getLimit();
+    _rows = new ArrayList<>(Math.min(_numRowsToKeep, 
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {

Review comment:
       For the streaming operator, each time return results from one block 
(i.e. remove the while loop). Returns `null` when all the blocks are returned.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock> {

Review comment:
       Can you please import Pinot code style in 
`config/codestyle-intellij.xml` or `config/codestyle-eclipse.xml` and reformat 
the files?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
##########
@@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment 
indexSegment, QueryContext quer
     }
   }
 
+  @Override
+  public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, 
QueryContext queryContext,
+                               ExecutorService executorService,
+                               StreamObserver<Server.ServerResponse> 
streamObserver, long timeOutMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, 
streamObserver));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext, executorService, 
timeOutMs, _numGroupsLimit);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, 
QueryContext queryContext,
+                                               
StreamObserver<Server.ServerResponse> streamObserver) {
+    if (QueryContextUtils.isAggregationQuery(queryContext)) {
+      // TODO: revisit, throw exception

Review comment:
       Let's throw exception for all queries other than selection only for now

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java
##########
@@ -18,18 +18,258 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.common.datatable.DataTableUtils;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
+import org.apache.pinot.core.plan.Plan;
+import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
+import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.pruner.SegmentPrunerService;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.core.util.trace.TraceContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Handler for grpc server requests.
  * As data becomes available server responses will be added to the result 
stream.
  * Once the request is complete the client will aggregate the result metadata.
  */
 public class PinotQueryHandler extends 
PinotQueryServerGrpc.PinotQueryServerImplBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotQueryHandler.class);
+
+  private InstanceDataManager _instanceDataManager = null;
+  private SegmentPrunerService _segmentPrunerService = null;
+  private PlanMaker _planMaker = null;
+  private long _defaultTimeOutMs = 
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+  private ServerMetrics _serverMetrics;
+
+  public synchronized void init(PinotConfiguration config, InstanceDataManager 
instanceDataManager,

Review comment:
       It will be very hard to reuse the current `QueryScheduler` because that 
is not designed for streaming API. So for the first version we can just launch 
an `ExecutorService` within this class and use it to execute queries without 
introducing query scheduling.

##########
File path: 
pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java
##########
@@ -18,6 +18,57 @@
  */
 package org.apache.pinot.server.starter.grpc;
 
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 public class PinotQueryService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotQueryService.class);
+
+  private final int port;

Review comment:
       Add `_` prefix for member variables

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class StreamingSelectionOnlyOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";

Review comment:
       ```suggestion
     private static final String OPERATOR_NAME = 
"StreamingSelectionOnlyOperator";
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to