Jackie-Jiang commented on a change in pull request #5717: URL: https://github.com/apache/incubator-pinot/pull/5717#discussion_r457748048
########## File path: pinot-core/pom.xml ########## @@ -227,5 +227,25 @@ <artifactId>lucene-analyzers-common</artifactId> <version>${lucene.version}</version> </dependency> + <dependency> Review comment: You don't need to import them again here as they are already imported in `pinot-common` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java ########## @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + +import java.util.ArrayList; +import java.util.List; + + +public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = "SelectionOnlyOperator"; + + private final IndexSegment _indexSegment; + private final TransformOperator _transformOperator; + private final List<ExpressionContext> _expressions; + private final BlockValSet[] _blockValSets; + private final DataSchema _dataSchema; + private final int _numRowsToKeep; + private final List<Object[]> _rows; + private final StreamObserver<Server.ServerResponse> _streamObserver; + + private int _numDocsScanned = 0; + + public StreamingSelectionOnlyOperator( + IndexSegment indexSegment, + QueryContext queryContext, + List<ExpressionContext> expressions, + TransformOperator transformOperator, + StreamObserver<Server.ServerResponse> streamObserver) { Review comment: Let's not pass the `streamObserver` to the segment level operator because multiple segments will be processed in parallel, but calls to `streamObserver.onNext()` need to be synchronized. We should create a `StreamingCombineOperator` (instance level operator) to keep fetching `IntermediateResultsBlock` from this operator and handle the calls to `streamObserver.onNext()`. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java ########## @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import io.grpc.stub.StreamObserver; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.EmptySelectionOperator; +import org.apache.pinot.core.operator.query.SelectionOnlyOperator; +import org.apache.pinot.core.operator.query.SelectionOrderByOperator; +import org.apache.pinot.core.operator.query.StreamingSelectionOnlyOperator; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.OrderByExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + +import java.util.ArrayList; +import java.util.List; + + +/** + * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a single segment. + */ +public class StreamingSelectionPlanNode implements PlanNode { + private final IndexSegment _indexSegment; + private final QueryContext _queryContext; + private final List<ExpressionContext> _expressions; + private final TransformPlanNode _transformPlanNode; + private final StreamObserver<Server.ServerResponse> _streamObserver; + + public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext queryContext, Review comment: Similarly here, don't pass the `streamObserver` to the segment-level plan node. Create a `StreamingCombinePlanNode` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext quer } } + @Override + public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, + ExecutorService executorService, + StreamObserver<Server.ServerResponse> streamObserver, long timeOutMs) { + List<PlanNode> planNodes = new ArrayList<>(indexSegments.size()); + for (IndexSegment indexSegment : indexSegments) { + planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, streamObserver)); + } + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit); + return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode)); + } + + @Override + public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext, Review comment: Don't pass `streamObserver` to the segment level `PlanNode` ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java ########## @@ -18,6 +18,57 @@ */ package org.apache.pinot.server.starter.grpc; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + public class PinotQueryService { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryService.class); + + private final int port; + private final Server server; + + // TODO: pass config, and parameters to initialize handler + public PinotQueryService(int port) { + this.port = port; + server = ServerBuilder.forPort(port) + .addService(new PinotQueryHandler()) + .build(); + + } + + /** Start serving requests. */ + public void start() throws IOException { + server.start(); + LOGGER.info("Server started. Listening on {}", port); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { Review comment: Not sure if this shutdown hook is necessary ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java ########## @@ -18,18 +18,258 @@ */ package org.apache.pinot.server.starter.grpc; +import com.google.common.base.Preconditions; import io.grpc.stub.StreamObserver; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.proto.PinotQueryServerGrpc; import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableImplV2; +import org.apache.pinot.core.common.datatable.DataTableUtils; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.SegmentDataManager; +import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.indexsegment.mutable.MutableSegment; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.config.QueryExecutorConfig; +import org.apache.pinot.core.query.exception.BadQueryRequestException; +import org.apache.pinot.core.query.pruner.SegmentPrunerService; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.TimerContext; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.core.util.QueryOptions; +import org.apache.pinot.core.util.trace.TraceContext; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Handler for grpc server requests. * As data becomes available server responses will be added to the result stream. * Once the request is complete the client will aggregate the result metadata. */ public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class); + + private InstanceDataManager _instanceDataManager = null; + private SegmentPrunerService _segmentPrunerService = null; + private PlanMaker _planMaker = null; + private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; + private ServerMetrics _serverMetrics; + + public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager, + ServerMetrics serverMetrics) + throws ConfigurationException { + _instanceDataManager = instanceDataManager; + _serverMetrics = serverMetrics; + QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config); + if (queryExecutorConfig.getTimeOut() > 0) { + _defaultTimeOutMs = queryExecutorConfig.getTimeOut(); + } + LOGGER.info("Default timeout for query executor : {}", _defaultTimeOutMs); + LOGGER.info("Trying to build SegmentPrunerService"); + _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig()); + LOGGER.info("Trying to build QueryPlanMaker"); + _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig); + LOGGER.info("Trying to build QueryExecutorTimer"); + } + + /** + * Submit a streaming request to the server + * @param request + * @param responseObserver + */ @Override public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) { + // TODO: implement, follow up whether to use ServerQueryRequest + } + + public DataTable processQuery(ServerQueryRequest queryRequest, Review comment: This method should not return anything (we can directly implement `submit()`) ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java ########## @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + +import java.util.ArrayList; +import java.util.List; + + +public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = "SelectionOnlyOperator"; + + private final IndexSegment _indexSegment; + private final TransformOperator _transformOperator; + private final List<ExpressionContext> _expressions; + private final BlockValSet[] _blockValSets; + private final DataSchema _dataSchema; + private final int _numRowsToKeep; + private final List<Object[]> _rows; + private final StreamObserver<Server.ServerResponse> _streamObserver; + + private int _numDocsScanned = 0; + + public StreamingSelectionOnlyOperator( + IndexSegment indexSegment, + QueryContext queryContext, + List<ExpressionContext> expressions, + TransformOperator transformOperator, + StreamObserver<Server.ServerResponse> streamObserver) { + _indexSegment = indexSegment; + _transformOperator = transformOperator; + _expressions = expressions; + _streamObserver = streamObserver; + + int numExpressions = _expressions.size(); + _blockValSets = new BlockValSet[numExpressions]; + String[] columnNames = new String[numExpressions]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions]; + for (int i = 0; i < numExpressions; i++) { + ExpressionContext expression = _expressions.get(i); + TransformResultMetadata expressionMetadata = _transformOperator.getResultMetadata(expression); + columnNames[i] = expression.toString(); + columnDataTypes[i] = + DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue()); + } + _dataSchema = new DataSchema(columnNames, columnDataTypes); + + _numRowsToKeep = queryContext.getLimit(); + _rows = new ArrayList<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY)); + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + TransformBlock transformBlock; + while ((transformBlock = _transformOperator.nextBlock()) != null) { Review comment: For the streaming operator, each time return results from one block (i.e. remove the while loop). Returns `null` when all the blocks are returned. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java ########## @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + +import java.util.ArrayList; +import java.util.List; + + +public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> { Review comment: Can you please import Pinot code style in `config/codestyle-intellij.xml` or `config/codestyle-eclipse.xml` and reformat the files? ########## File path: pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java ########## @@ -139,6 +143,52 @@ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext quer } } + @Override + public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, + ExecutorService executorService, + StreamObserver<Server.ServerResponse> streamObserver, long timeOutMs) { + List<PlanNode> planNodes = new ArrayList<>(indexSegments.size()); + for (IndexSegment indexSegment : indexSegments) { + planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext, streamObserver)); + } + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit); + return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode)); + } + + @Override + public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext, + StreamObserver<Server.ServerResponse> streamObserver) { + if (QueryContextUtils.isAggregationQuery(queryContext)) { + // TODO: revisit, throw exception Review comment: Let's throw exception for all queries other than selection only for now ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java ########## @@ -18,18 +18,258 @@ */ package org.apache.pinot.server.starter.grpc; +import com.google.common.base.Preconditions; import io.grpc.stub.StreamObserver; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.proto.PinotQueryServerGrpc; import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableImplV2; +import org.apache.pinot.core.common.datatable.DataTableUtils; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.SegmentDataManager; +import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.indexsegment.mutable.MutableSegment; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.config.QueryExecutorConfig; +import org.apache.pinot.core.query.exception.BadQueryRequestException; +import org.apache.pinot.core.query.pruner.SegmentPrunerService; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.TimerContext; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.core.util.QueryOptions; +import org.apache.pinot.core.util.trace.TraceContext; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Handler for grpc server requests. * As data becomes available server responses will be added to the result stream. * Once the request is complete the client will aggregate the result metadata. */ public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryHandler.class); + + private InstanceDataManager _instanceDataManager = null; + private SegmentPrunerService _segmentPrunerService = null; + private PlanMaker _planMaker = null; + private long _defaultTimeOutMs = CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; + private ServerMetrics _serverMetrics; + + public synchronized void init(PinotConfiguration config, InstanceDataManager instanceDataManager, Review comment: It will be very hard to reuse the current `QueryScheduler` because that is not designed for streaming API. So for the first version we can just launch an `ExecutorService` within this class and use it to execute queries without introducing query scheduling. ########## File path: pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java ########## @@ -18,6 +18,57 @@ */ package org.apache.pinot.server.starter.grpc; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + public class PinotQueryService { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryService.class); + + private final int port; Review comment: Add `_` prefix for member variables ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/StreamingSelectionOnlyOperator.java ########## @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + +import java.util.ArrayList; +import java.util.List; + + +public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = "SelectionOnlyOperator"; Review comment: ```suggestion private static final String OPERATOR_NAME = "StreamingSelectionOnlyOperator"; ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org