This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new f88a275 Add streaming query handler (#5717) f88a275 is described below commit f88a2759448b8d9eaeeb1b33b894e24df852b9c2 Author: Elon Azoulay <elon.azou...@gmail.com> AuthorDate: Tue Sep 8 18:12:49 2020 -0700 Add streaming query handler (#5717) Add streaming query handler and a gRPC query server to stream the query responses Currently only support streaming selection only queries Added `GrpcRequestBuilder` and `GrpcQueryClient` to help build the request and query the server Co-authored-by: Xiaotian (Jackie) Jiang <jackie....@gmail.com> --- .../pinot/common/exception/QueryException.java | 4 + .../apache/pinot/common/utils/CommonConstants.java | 39 ++- .../pinot/common/utils/grpc/GrpcQueryClient.java | 26 +- .../common/utils/grpc/GrpcRequestBuilder.java | 106 +++++++ .../operator/blocks/IntermediateResultsBlock.java | 11 +- .../operator/streaming/StreamingResponseUtils.java | 52 ++++ .../StreamingSelectionOnlyCombineOperator.java | 189 ++++++++++++ .../streaming/StreamingSelectionOnlyOperator.java | 111 +++++++ .../apache/pinot/core/plan/CombinePlanNode.java | 14 +- .../core/plan/StreamingSelectionPlanNode.java | 55 ++++ .../core/plan/maker/InstancePlanMakerImplV2.java | 27 +- .../apache/pinot/core/plan/maker/PlanMaker.java | 15 + .../core/query/executor/GrpcQueryExecutor.java | 327 +++++++++++++++++++++ .../core/query/request/ServerQueryRequest.java | 69 ++++- .../pinot/core/transport/grpc/GrpcQueryServer.java | 38 ++- .../combine/SelectionCombineOperatorTest.java | 2 +- .../pinot/core/plan/CombinePlanNodeTest.java | 6 +- .../MultiNodesOfflineClusterIntegrationTest.java | 14 + .../tests/OfflineClusterIntegrationTest.java | 98 +++++- .../org/apache/pinot/server/conf/ServerConf.java | 22 +- .../pinot/server/starter/ServerInstance.java | 41 ++- .../server/starter/grpc/PinotQueryService.java | 23 -- .../server/starter/helix/HelixServerStarter.java | 171 +++++------ 23 files changed, 1255 insertions(+), 205 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index 89dda00..10a1544 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -45,6 +45,7 @@ public class QueryException { // TODO: Handle these errors in broker public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210; public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211; + public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230; public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240; public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250; public static final int BROKER_GATHER_ERROR_CODE = 300; @@ -76,6 +77,8 @@ public class QueryException { new ProcessingException(SERVER_SHUTTING_DOWN_ERROR_CODE); public static final ProcessingException SERVER_OUT_OF_CAPACITY_ERROR = new ProcessingException(SERVER_OUT_OF_CAPACITY_ERROR_CODE); + public static final ProcessingException SERVER_TABLE_MISSING_ERROR = + new ProcessingException(SERVER_TABLE_MISSING_ERROR_CODE); public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR = new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE); public static final ProcessingException EXECUTION_TIMEOUT_ERROR = @@ -108,6 +111,7 @@ public class QueryException { QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError"); SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown"); SERVER_OUT_OF_CAPACITY_ERROR.setMessage("ServerOutOfCapacity"); + SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing"); QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError"); EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError"); BROKER_GATHER_ERROR.setMessage("BrokerGatherError"); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index e116849..ac3a796 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -105,6 +105,7 @@ public class CommonConstants { public static final String INSTANCE_ID_KEY = "instanceId"; public static final String DATA_DIR_KEY = "dataDir"; public static final String ADMIN_PORT_KEY = "adminPort"; + public static final String GRPC_PORT_KEY = "grpcPort"; } public static final String SET_INSTANCE_ID_TO_HOSTNAME_KEY = "pinot.set.instance.id.to.hostname"; @@ -190,7 +191,12 @@ public class CommonConstants { public static final String CONFIG_OF_QUERY_EXECUTOR_CLASS = "pinot.server.query.executor.class"; public static final String CONFIG_OF_REQUEST_HANDLER_FACTORY_CLASS = "pinot.server.requestHandlerFactory.class"; public static final String CONFIG_OF_NETTY_PORT = "pinot.server.netty.port"; + public static final String CONFIG_OF_ENABLE_GRPC_SERVER = "pinot.server.grpc.enable"; + public static final boolean DEFAULT_ENABLE_GRPC_SERVER = false; + public static final String CONFIG_OF_GRPC_PORT = "pinot.server.grpc.port"; + public static final int DEFAULT_GRPC_PORT = 8090; public static final String CONFIG_OF_ADMIN_API_PORT = "pinot.server.adminapi.port"; + public static final int DEFAULT_ADMIN_API_PORT = 8097; public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version"; public static final String CONFIG_OF_ENABLE_SPLIT_COMMIT = "pinot.server.instance.enable.split.commit"; @@ -211,7 +217,6 @@ public class CommonConstants { "pinot.server.starter.realtimeConsumptionCatchupWaitMs"; public static final int DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS = 0; - public static final int DEFAULT_ADMIN_API_PORT = 8097; public static final String DEFAULT_READ_MODE = "mmap"; // Whether to reload consuming segment on scheme update. Will change default behavior to true when this feature is stabilized public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = false; @@ -387,4 +392,36 @@ public class CommonConstants { @Deprecated public static final String TABLE_NAME = "segment.table.name"; } + + public static class Query { + public static class Request { + public static class MetadataKeys { + public static final String REQUEST_ID = "requestId"; + public static final String BROKER_ID = "brokerId"; + public static final String ENABLE_TRACE = "enableTrace"; + public static final String ENABLE_STREAMING = "enableStreaming"; + public static final String PAYLOAD_TYPE = "payloadType"; + } + + public static class PayloadType { + public static final String SQL = "sql"; + public static final String BROKER_REQUEST = "brokerRequest"; + } + } + + public static class Response { + public static class MetadataKeys { + public static final String RESPONSE_TYPE = "responseType"; + } + + public static class ResponseType { + // For streaming response, multiple (could be 0 if no data should be returned, or query encounters exception) + // data responses will be returned, followed by one single metadata response + public static final String DATA = "data"; + public static final String METADATA = "metadata"; + // For non-streaming response + public static final String NON_STREAMING = "nonStreaming"; + } + } + } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java similarity index 58% rename from pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java rename to pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java index 23cff7ad..68c03b8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryHandler.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java @@ -16,20 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.starter.grpc; +package org.apache.pinot.common.utils.grpc; -import io.grpc.stub.StreamObserver; +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import java.util.Iterator; import org.apache.pinot.common.proto.PinotQueryServerGrpc; import org.apache.pinot.common.proto.Server; -/** - * Handler for grpc server requests. - * As data becomes available server responses will be added to the result stream. - * Once the request is complete the client will aggregate the result metadata. - */ -public class PinotQueryHandler extends PinotQueryServerGrpc.PinotQueryServerImplBase { - @Override - public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) { +public class GrpcQueryClient { + private final PinotQueryServerGrpc.PinotQueryServerBlockingStub _blockingStub; + + public GrpcQueryClient(String host, int port) { + // Set max message size to 128MB + Channel channel = + ManagedChannelBuilder.forAddress(host, port).maxInboundMessageSize(128 * 1024 * 1024).usePlaintext().build(); + _blockingStub = PinotQueryServerGrpc.newBlockingStub(channel); + } + + public Iterator<Server.ServerResponse> submit(Server.ServerRequest request) { + return _blockingStub.submit(request); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java new file mode 100644 index 0000000..0d6daeb --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils.grpc; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.collections.CollectionUtils; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.CommonConstants.Query.Request; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; + + +public class GrpcRequestBuilder { + private int _requestId; + private String _brokerId = "unknown"; + private boolean _enableTrace; + private boolean _enableStreaming; + private String _payloadType; + private String _sql; + private BrokerRequest _brokerRequest; + private List<String> _segments; + + public GrpcRequestBuilder setRequestId(int requestId) { + _requestId = requestId; + return this; + } + + public GrpcRequestBuilder setBrokerId(String brokerId) { + _brokerId = brokerId; + return this; + } + + public GrpcRequestBuilder setEnableTrace(boolean enableTrace) { + _enableTrace = enableTrace; + return this; + } + + public GrpcRequestBuilder setEnableStreaming(boolean enableStreaming) { + _enableStreaming = enableStreaming; + return this; + } + + public GrpcRequestBuilder setSql(String sql) { + _payloadType = Request.PayloadType.SQL; + _sql = sql; + return this; + } + + public GrpcRequestBuilder setBrokerRequest(BrokerRequest brokerRequest) { + _payloadType = Request.PayloadType.BROKER_REQUEST; + _brokerRequest = brokerRequest; + return this; + } + + public GrpcRequestBuilder setSegments(List<String> segments) { + _segments = segments; + return this; + } + + public Server.ServerRequest build() { + Preconditions.checkState(_payloadType != null && CollectionUtils.isNotEmpty(_segments), + "Query and segmentsToQuery must be set"); + + Map<String, String> metadata = new HashMap<>(); + metadata.put(Request.MetadataKeys.REQUEST_ID, Integer.toString(_requestId)); + metadata.put(Request.MetadataKeys.BROKER_ID, _brokerId); + metadata.put(Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(_enableTrace)); + metadata.put(Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(_enableStreaming)); + metadata.put(Request.MetadataKeys.PAYLOAD_TYPE, _payloadType); + + if (_payloadType.equals(Request.PayloadType.SQL)) { + return Server.ServerRequest.newBuilder().putAllMetadata(metadata).setSql(_sql).addAllSegments(_segments).build(); + } else { + byte[] payLoad; + try { + payLoad = new TSerializer(new TCompactProtocol.Factory()).serialize(_brokerRequest); + } catch (TException e) { + throw new RuntimeException("Caught exception while serializing broker request: " + _brokerRequest, e); + } + return Server.ServerRequest.newBuilder().putAllMetadata(metadata).setPayload(ByteString.copyFrom(payLoad)) + .addAllSegments(_segments).build(); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java index d0eb2e2..79ac520 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java @@ -67,6 +67,9 @@ public class IntermediateResultsBlock implements Block { private Table _table; + public IntermediateResultsBlock() { + } + /** * Constructor for selection result. */ @@ -270,11 +273,7 @@ public class IntermediateResultsBlock implements Block { return getAggregationGroupByResultDataTable(); } - if (_processingExceptions != null && _processingExceptions.size() > 0) { - return getProcessingExceptionsDataTable(); - } - - throw new UnsupportedOperationException("No data inside IntermediateResultsBlock."); + return getMetadataDataTable(); } private DataTable getResultDataTable() @@ -405,7 +404,7 @@ public class IntermediateResultsBlock implements Block { return attachMetadataToDataTable(dataTable); } - private DataTable getProcessingExceptionsDataTable() { + private DataTable getMetadataDataTable() { return attachMetadataToDataTable(new DataTableImplV2()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java new file mode 100644 index 0000000..3330342 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingResponseUtils.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.streaming; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.CommonConstants.Query.Response; +import org.apache.pinot.common.utils.DataTable; + + +public class StreamingResponseUtils { + private StreamingResponseUtils() { + } + + public static Server.ServerResponse getDataResponse(DataTable dataTable) + throws IOException { + return getResponse(dataTable, Response.ResponseType.DATA); + } + + public static Server.ServerResponse getMetadataResponse(DataTable dataTable) + throws IOException { + return getResponse(dataTable, Response.ResponseType.METADATA); + } + + public static Server.ServerResponse getNonStreamingResponse(DataTable dataTable) + throws IOException { + return getResponse(dataTable, Response.ResponseType.NON_STREAMING); + } + + private static Server.ServerResponse getResponse(DataTable dataTable, String responseType) + throws IOException { + return Server.ServerResponse.newBuilder().putMetadata(Response.MetadataKeys.RESPONSE_TYPE, responseType) + .setPayload(ByteString.copyFrom(dataTable.toBytes())).build(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java new file mode 100644 index 0000000..619bcc5 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyCombineOperator.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.streaming; + +import io.grpc.stub.StreamObserver; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.combine.CombineOperatorUtils; +import org.apache.pinot.core.query.exception.EarlyTerminationException; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.core.util.trace.TraceRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for selection only streaming queries. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class StreamingSelectionOnlyCombineOperator extends BaseOperator<IntermediateResultsBlock> { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingSelectionOnlyCombineOperator.class); + private static final String OPERATOR_NAME = "StreamingSelectionOnlyCombineOperator"; + + // Special IntermediateResultsBlock to indicate that this is the last results block for an operator + private static final IntermediateResultsBlock LAST_RESULTS_BLOCK = + new IntermediateResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), + Collections.emptyList()); + + private final List<Operator> _operators; + private final QueryContext _queryContext; + private final ExecutorService _executorService; + private final long _endTimeMs; + private final StreamObserver<Server.ServerResponse> _streamObserver; + private final int _limit; + + public StreamingSelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService, long endTimeMs, StreamObserver<Server.ServerResponse> streamObserver) { + _operators = operators; + _queryContext = queryContext; + _executorService = executorService; + _endTimeMs = endTimeMs; + _streamObserver = streamObserver; + _limit = queryContext.getLimit(); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + protected IntermediateResultsBlock getNextBlock() { + int numOperators = _operators.size(); + int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators); + + // Use a BlockingQueue to store all the results blocks + BlockingQueue<IntermediateResultsBlock> blockingQueue = new LinkedBlockingQueue<>(); + // Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread + // returns. We need to ensure this because the main thread holds the reference to the segments. If a segment is + // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined + // behavior (even JVM crash) when processing queries against it. + Phaser phaser = new Phaser(1); + + Future[] futures = new Future[numThreads]; + for (int i = 0; i < numThreads; i++) { + int threadIndex = i; + futures[i] = _executorService.submit(new TraceRunnable() { + @Override + public void runJob() { + try { + // Register the thread to the phaser + // NOTE: If the phaser is terminated (returning negative value) when trying to register the thread, that + // means the query execution has finished, and the main thread has deregistered itself and returned + // the result. Directly return as no execution result will be taken. + if (phaser.register() < 0) { + return; + } + + int numRowsCollected = 0; + for (int operatorIndex = threadIndex; operatorIndex < numOperators; operatorIndex += numThreads) { + Operator<IntermediateResultsBlock> operator = _operators.get(operatorIndex); + try { + IntermediateResultsBlock resultsBlock; + while ((resultsBlock = operator.nextBlock()) != null) { + Collection<Object[]> rows = resultsBlock.getSelectionResult(); + assert rows != null; + numRowsCollected += rows.size(); + blockingQueue.offer(resultsBlock); + if (numRowsCollected >= _limit) { + return; + } + } + blockingQueue.offer(LAST_RESULTS_BLOCK); + } catch (EarlyTerminationException e) { + // Early-terminated by interruption (canceled by the main thread) + return; + } catch (Exception e) { + // Caught exception, skip processing the remaining operators + LOGGER.error("Caught exception while executing operator of index: {} (query: {})", operatorIndex, + _queryContext, e); + blockingQueue.offer(new IntermediateResultsBlock(e)); + return; + } + } + } finally { + phaser.arriveAndDeregister(); + } + } + }); + } + + try { + int numRowsCollected = 0; + int numOperatorsFinished = 0; + while (numRowsCollected < _limit && numOperatorsFinished < numOperators) { + IntermediateResultsBlock resultsBlock = + blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (resultsBlock == null) { + // Query times out, skip streaming the remaining results blocks + LOGGER.error("Timed out while polling results block (query: {})", _queryContext); + return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); + } + if (resultsBlock.getProcessingExceptions() != null) { + // Caught exception while processing segment, skip streaming the remaining results blocks and directly return + // the exception + return resultsBlock; + } + if (resultsBlock == LAST_RESULTS_BLOCK) { + numOperatorsFinished++; + continue; + } + DataSchema dataSchema = resultsBlock.getDataSchema(); + Collection<Object[]> rows = resultsBlock.getSelectionResult(); + assert dataSchema != null && rows != null; + numRowsCollected += rows.size(); + DataTable dataTable = SelectionOperatorUtils.getDataTableFromRows(rows, dataSchema); + _streamObserver.onNext(StreamingResponseUtils.getDataResponse(dataTable)); + } + IntermediateResultsBlock metadataBlock = new IntermediateResultsBlock(); + CombineOperatorUtils.setExecutionStatistics(metadataBlock, _operators); + return metadataBlock; + } catch (Exception e) { + LOGGER.error("Caught exception while streaming results blocks (query: {})", _queryContext, e); + return new IntermediateResultsBlock(QueryException.INTERNAL_ERROR, e); + } finally { + // Cancel all ongoing jobs + for (Future future : futures) { + if (!future.isDone()) { + future.cancel(true); + } + } + // Deregister the main thread and wait for all threads done + phaser.awaitAdvance(phaser.arriveAndDeregister()); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java new file mode 100644 index 0000000..4bb991d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingSelectionOnlyOperator.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.streaming; + +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.RowBasedBlockValueFetcher; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.blocks.TransformBlock; +import org.apache.pinot.core.operator.transform.TransformOperator; +import org.apache.pinot.core.operator.transform.TransformResultMetadata; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; + + +public class StreamingSelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> { + private static final String OPERATOR_NAME = "StreamingSelectionOnlyOperator"; + + private final IndexSegment _indexSegment; + private final TransformOperator _transformOperator; + private final List<ExpressionContext> _expressions; + private final BlockValSet[] _blockValSets; + private final DataSchema _dataSchema; + private final int _limit; + + private int _numDocsScanned = 0; + + public StreamingSelectionOnlyOperator(IndexSegment indexSegment, QueryContext queryContext, + List<ExpressionContext> expressions, TransformOperator transformOperator) { + _indexSegment = indexSegment; + _transformOperator = transformOperator; + _expressions = expressions; + + int numExpressions = expressions.size(); + _blockValSets = new BlockValSet[numExpressions]; + String[] columnNames = new String[numExpressions]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions]; + for (int i = 0; i < numExpressions; i++) { + ExpressionContext expression = expressions.get(i); + TransformResultMetadata expressionMetadata = transformOperator.getResultMetadata(expression); + columnNames[i] = expression.toString(); + columnDataTypes[i] = + DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue()); + } + _dataSchema = new DataSchema(columnNames, columnDataTypes); + + _limit = queryContext.getLimit(); + } + + @Nullable + @Override + protected IntermediateResultsBlock getNextBlock() { + if (_numDocsScanned >= _limit) { + // Already returned enough documents + return null; + } + TransformBlock transformBlock = _transformOperator.nextBlock(); + if (transformBlock == null) { + return null; + } + int numExpressions = _expressions.size(); + for (int i = 0; i < numExpressions; i++) { + _blockValSets[i] = transformBlock.getBlockValueSet(_expressions.get(i)); + } + RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(_blockValSets); + int numDocs = transformBlock.getNumDocs(); + int numDocsToReturn = Math.min(_limit - _numDocsScanned, numDocs); + List<Object[]> rows = new ArrayList<>(numDocsToReturn); + for (int i = 0; i < numDocsToReturn; i++) { + rows.add(blockValueFetcher.getRow(i)); + } + _numDocsScanned += numDocs; + return new IntermediateResultsBlock(_dataSchema, rows); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + long numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected(); + int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs(); + return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, + numTotalDocs); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java index b6d9155..0bce7c0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.plan; +import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -25,6 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator; @@ -32,6 +35,7 @@ import org.apache.pinot.core.operator.combine.GroupByCombineOperator; import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator; import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator; +import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator; import org.apache.pinot.core.query.exception.BadQueryRequestException; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; @@ -56,6 +60,7 @@ public class CombinePlanNode implements PlanNode { private final ExecutorService _executorService; private final long _endTimeMs; private final int _numGroupsLimit; + private final StreamObserver<Server.ServerResponse> _streamObserver; /** * Constructor for the class. @@ -65,14 +70,16 @@ public class CombinePlanNode implements PlanNode { * @param executorService Executor service * @param endTimeMs End time in milliseconds for the query * @param numGroupsLimit Limit of number of groups stored in each segment + * @param streamObserver Optional stream observer for streaming query */ public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService, - long endTimeMs, int numGroupsLimit) { + long endTimeMs, int numGroupsLimit, @Nullable StreamObserver<Server.ServerResponse> streamObserver) { _planNodes = planNodes; _queryContext = queryContext; _executorService = executorService; _endTimeMs = endTimeMs; _numGroupsLimit = numGroupsLimit; + _streamObserver = streamObserver; } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -155,6 +162,11 @@ public class CombinePlanNode implements PlanNode { } } + if (_streamObserver != null) { + // Streaming query (only support selection only) + return new StreamingSelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs, + _streamObserver); + } if (QueryContextUtils.isAggregationQuery(_queryContext)) { if (_queryContext.getGroupByExpressions() == null) { // Aggregation only diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java new file mode 100644 index 0000000..1ae28be --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingSelectionPlanNode.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.plan; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyOperator; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; + + +/** + * The {@code StreamingSelectionPlanNode} class provides the execution plan for streaming selection query on a single + * segment. + * <p>NOTE: ORDER-BY is ignored for streaming selection query. + */ +public class StreamingSelectionPlanNode implements PlanNode { + private final IndexSegment _indexSegment; + private final QueryContext _queryContext; + private final List<ExpressionContext> _expressions; + private final TransformPlanNode _transformPlanNode; + + public StreamingSelectionPlanNode(IndexSegment indexSegment, QueryContext queryContext) { + Preconditions + .checkState(queryContext.getOrderByExpressions() == null, "Selection order-by is not supported for streaming"); + _indexSegment = indexSegment; + _queryContext = queryContext; + _expressions = SelectionOperatorUtils.extractExpressions(queryContext, indexSegment); + _transformPlanNode = new TransformPlanNode(_indexSegment, queryContext, _expressions, + Math.min(queryContext.getLimit(), DocIdSetPlanNode.MAX_DOC_PER_CALL)); + } + + @Override + public StreamingSelectionOnlyOperator run() { + return new StreamingSelectionOnlyOperator(_indexSegment, _queryContext, _expressions, _transformPlanNode.run()); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 746b08a..5f6ea37 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -20,10 +20,12 @@ package org.apache.pinot.core.plan.maker; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.pinot.common.function.AggregationFunctionType; +import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode; import org.apache.pinot.core.plan.AggregationGroupByPlanNode; @@ -36,6 +38,7 @@ import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode; import org.apache.pinot.core.plan.Plan; import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.core.plan.SelectionPlanNode; +import org.apache.pinot.core.plan.StreamingSelectionPlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.config.QueryExecutorConfig; import org.apache.pinot.core.query.request.context.ExpressionContext; @@ -103,7 +106,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { planNodes.add(makeSegmentPlanNode(indexSegment, queryContext)); } CombinePlanNode combinePlanNode = - new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit); + new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit, null); return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode)); } @@ -139,6 +142,28 @@ public class InstancePlanMakerImplV2 implements PlanMaker { } } + @Override + public Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, + ExecutorService executorService, StreamObserver<Server.ServerResponse> streamObserver, long endTimeMs) { + List<PlanNode> planNodes = new ArrayList<>(indexSegments.size()); + for (IndexSegment indexSegment : indexSegments) { + planNodes.add(makeStreamingSegmentPlanNode(indexSegment, queryContext)); + } + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit, streamObserver); + return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode)); + } + + @Override + public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) { + if (QueryContextUtils.isAggregationQuery(queryContext)) { + throw new UnsupportedOperationException("Queries with aggregations are not supported"); + } else { + // Selection query + return new StreamingSelectionPlanNode(indexSegment, queryContext); + } + } + /** * Returns {@code true} if the given aggregation-only without filter QueryContext can be solved with segment metadata, * {@code false} otherwise. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java index b4a316d..9d5411a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.core.plan.maker; +import io.grpc.stub.StreamObserver; import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.plan.Plan; import org.apache.pinot.core.plan.PlanNode; @@ -43,4 +45,17 @@ public interface PlanMaker { * Returns a segment level {@link PlanNode} which contains the logical execution plan for one segment. */ PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext); + + /** + * Returns an instance level {@link Plan} for a streaming query which contains the logical execution plan for multiple + * segments. + */ + Plan makeStreamingInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, + ExecutorService executorService, StreamObserver<Server.ServerResponse> streamObserver, long endTimeMs); + + /** + * Returns a segment level {@link PlanNode} for a streaming query which contains the logical execution plan for one + * segment. + */ + PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java new file mode 100644 index 0000000..e64fd34 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/GrpcQueryExecutor.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.executor; + +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerQueryPhase; +import org.apache.pinot.common.proto.PinotQueryServerGrpc; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.datatable.DataTableImplV2; +import org.apache.pinot.core.common.datatable.DataTableUtils; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.SegmentDataManager; +import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.indexsegment.mutable.MutableSegment; +import org.apache.pinot.core.operator.streaming.StreamingResponseUtils; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.config.QueryExecutorConfig; +import org.apache.pinot.core.query.exception.BadQueryRequestException; +import org.apache.pinot.core.query.pruner.SegmentPrunerService; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.TimerContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.core.util.QueryOptions; +import org.apache.pinot.core.util.trace.TraceContext; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Query executor for gRPC server requests. + * <ul> + * <li> + * For streaming request, multiple (could be 0 if no data should be returned, or query encounters exception) data + * responses will be returned, followed by one single metadata response. + * </li> + * <li> + * For non-streaming request, one single response containing both data and metadata will be returned. + * </li> + * </ul> + * TODO: Plug in QueryScheduler + */ +public class GrpcQueryExecutor extends PinotQueryServerGrpc.PinotQueryServerImplBase { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryExecutor.class); + + private final InstanceDataManager _instanceDataManager; + private final ServerMetrics _serverMetrics; + private final long _defaultTimeOutMs; + private final SegmentPrunerService _segmentPrunerService; + private final PlanMaker _planMaker; + private final ExecutorService _executorService = + Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS); + + public GrpcQueryExecutor(PinotConfiguration config, InstanceDataManager instanceDataManager, + ServerMetrics serverMetrics) + throws ConfigurationException { + _instanceDataManager = instanceDataManager; + _serverMetrics = serverMetrics; + QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config); + long defaultTimeoutMs = queryExecutorConfig.getTimeOut(); + _defaultTimeOutMs = + defaultTimeoutMs > 0 ? defaultTimeoutMs : CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; + _segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig()); + _planMaker = new InstancePlanMakerImplV2(queryExecutorConfig); + LOGGER.info("Initialized PinotQueryHandler with default timeout: {}ms, numWorkerThreads: {}", _defaultTimeOutMs, + ResourceManager.DEFAULT_QUERY_WORKER_THREADS); + } + + @Override + public void submit(Server.ServerRequest request, StreamObserver<Server.ServerResponse> responseObserver) { + // Deserialize the request + ServerQueryRequest queryRequest; + try { + queryRequest = new ServerQueryRequest(request, _serverMetrics); + } catch (Exception e) { + LOGGER.error("Caught exception while deserializing the request: {}", request, e); + _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1); + responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException()); + return; + } + + // Process the query + try { + processQuery(queryRequest, responseObserver); + } catch (Exception e) { + LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(), + queryRequest.getQueryContext(), queryRequest.getBrokerId(), e); + _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1); + responseObserver.onError(Status.INTERNAL.withCause(e).asException()); + } + } + + private void processQuery(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver) { + TimerContext timerContext = queryRequest.getTimerContext(); + TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT); + if (schedulerWaitTimer != null) { + schedulerWaitTimer.stopAndRecord(); + } + long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs(); + long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; + TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING); + + long requestId = queryRequest.getRequestId(); + String tableNameWithType = queryRequest.getTableNameWithType(); + QueryContext queryContext = queryRequest.getQueryContext(); + LOGGER.debug("Incoming request Id: {}, query: {}", requestId, queryContext); + // Use the timeout passed from the request if exists, or the instance-level timeout + long queryTimeoutMs = _defaultTimeOutMs; + Map<String, String> queryOptions = queryContext.getQueryOptions(); + if (queryOptions != null) { + Long timeoutFromQueryOptions = QueryOptions.getTimeoutMs(queryOptions); + if (timeoutFromQueryOptions != null) { + queryTimeoutMs = timeoutFromQueryOptions; + } + } + + // Query scheduler wait time already exceeds query timeout, directly return + if (querySchedulingTimeMs >= queryTimeoutMs) { + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1); + String errorMessage = String + .format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs, + queryTimeoutMs); + DataTable dataTable = new DataTableImplV2(); + dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage)); + LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); + sendResponse(queryRequest, streamObserver, dataTable); + return; + } + + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager == null) { + String errorMessage = "Failed to find table: " + tableNameWithType; + DataTable dataTable = new DataTableImplV2(); + dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage)); + LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); + sendResponse(queryRequest, streamObserver, dataTable); + return; + } + + List<String> segmentsToQuery = queryRequest.getSegmentsToQuery(); + List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentsToQuery); + + // When segment is removed from the IdealState: + // 1. Controller schedules a state transition to server to turn segment OFFLINE + // 2. Server gets the state transition, removes the segment data manager and update its CurrentState + // 3. Controller gathers the CurrentState and update the ExternalView + // 4. Broker watches ExternalView change and updates the routing table to stop querying the segment + // + // After step 2 but before step 4, segment will be missing on server side + // TODO: Change broker to watch both IdealState and ExternalView to not query the removed segments + int numSegmentsQueried = segmentsToQuery.size(); + int numSegmentsAcquired = segmentDataManagers.size(); + if (numSegmentsQueried > numSegmentsAcquired) { + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, + numSegmentsQueried - numSegmentsAcquired); + } + + boolean enableTrace = queryRequest.isEnableTrace(); + if (enableTrace) { + TraceContext.register(requestId); + } + + int numConsumingSegmentsProcessed = 0; + long minIndexTimeMs = Long.MAX_VALUE; + long minIngestionTimeMs = Long.MAX_VALUE; + // gather stats for realtime consuming segments + for (SegmentDataManager segmentMgr : segmentDataManagers) { + if (segmentMgr.getSegment() instanceof MutableSegment) { + numConsumingSegmentsProcessed += 1; + SegmentMetadata metadata = segmentMgr.getSegment().getSegmentMetadata(); + long indexedTime = metadata.getLastIndexedTimestamp(); + if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) { + minIndexTimeMs = metadata.getLastIndexedTimestamp(); + } + long ingestionTime = metadata.getLatestIngestionTimestamp(); + if (ingestionTime != Long.MIN_VALUE && ingestionTime < minIngestionTimeMs) { + minIngestionTimeMs = ingestionTime; + } + } + } + + long minConsumingFreshnessTimeMs = minIngestionTimeMs; + if (numConsumingSegmentsProcessed > 0) { + if (minIngestionTimeMs == Long.MAX_VALUE) { + LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead"); + minConsumingFreshnessTimeMs = minIndexTimeMs; + } + LOGGER + .debug("Querying: {} consuming segments with minConsumingFreshnessTimeMs: {}", numConsumingSegmentsProcessed, + minConsumingFreshnessTimeMs); + } + + DataTable dataTable = null; + try { + // Compute total docs for the table before pruning the segments + long numTotalDocs = 0; + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + numTotalDocs += segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs(); + } + TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING); + segmentDataManagers = _segmentPrunerService.prune(tableDataManager, segmentDataManagers, queryRequest); + segmentPruneTimer.stopAndRecord(); + int numSegmentsMatchedAfterPruning = segmentDataManagers.size(); + LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning); + if (numSegmentsMatchedAfterPruning == 0) { + dataTable = + queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext); + Map<String, String> metadata = dataTable.getMetadata(); + metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs)); + metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0"); + metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0"); + metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0"); + metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0"); + metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0"); + } else { + TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN); + List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsMatchedAfterPruning); + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + indexSegments.add(segmentDataManager.getSegment()); + } + long endTimeMs = queryArrivalTimeMs + queryTimeoutMs; + Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker + .makeStreamingInstancePlan(indexSegments, queryContext, _executorService, streamObserver, endTimeMs) + : _planMaker.makeInstancePlan(indexSegments, queryContext, _executorService, endTimeMs); + planBuildTimer.stopAndRecord(); + + TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); + dataTable = globalQueryPlan.execute(); + planExecTimer.stopAndRecord(); + + // Update the total docs in the metadata based on un-pruned segments. + dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, Long.toString(numTotalDocs)); + } + } catch (Exception e) { + _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); + + // Do not log error for BadQueryRequestException because it's caused by bad query + if (e instanceof BadQueryRequestException) { + LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage()); + } else { + LOGGER.error("Exception processing requestId {}", requestId, e); + } + + dataTable = new DataTableImplV2(); + dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + tableDataManager.releaseSegment(segmentDataManager); + } + if (enableTrace) { + if (dataTable != null) { + dataTable.getMetadata().put(DataTable.TRACE_INFO_METADATA_KEY, TraceContext.getTraceInfo()); + } + TraceContext.unregister(); + } + } + + queryProcessingTimer.stopAndRecord(); + long queryProcessingTime = queryProcessingTimer.getDurationMs(); + dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried)); + dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime)); + + if (numConsumingSegmentsProcessed > 0) { + dataTable.getMetadata() + .put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, Integer.toString(numConsumingSegmentsProcessed)); + dataTable.getMetadata() + .put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, Long.toString(minConsumingFreshnessTimeMs)); + } + + LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime); + LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable); + + // TODO: Log query stats + + sendResponse(queryRequest, streamObserver, dataTable); + } + + private void sendResponse(ServerQueryRequest queryRequest, StreamObserver<Server.ServerResponse> streamObserver, + DataTable dataTable) { + Server.ServerResponse response; + try { + response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable) + : StreamingResponseUtils.getNonStreamingResponse(dataTable); + } catch (Exception e) { + LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}", + queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e); + _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1); + streamObserver.onError(Status.INTERNAL.withCause(e).asException()); + return; + } + streamObserver.onNext(response); + streamObserver.onCompleted(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index 1763335..24f2fa2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -19,14 +19,20 @@ package org.apache.pinot.core.query.request; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; +import org.apache.pinot.common.utils.CommonConstants.Query.Request; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TCompactProtocol; /** @@ -36,11 +42,14 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; * per segment basis. */ public class ServerQueryRequest { + private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler(); + private final long _requestId; + private final String _brokerId; + private final boolean _enableTrace; + private final boolean _enableStreaming; private final String _tableNameWithType; private final List<String> _segmentsToQuery; - private final boolean _enableTrace; - private final String _brokerId; // Timing information for different phases of query execution private final TimerContext _timerContext; @@ -51,11 +60,12 @@ public class ServerQueryRequest { public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics serverMetrics, long queryArrivalTimeMs) { _requestId = instanceRequest.getRequestId(); + _brokerId = instanceRequest.getBrokerId() != null ? instanceRequest.getBrokerId() : "unknown"; + _enableTrace = instanceRequest.isEnableTrace(); + _enableStreaming = false; BrokerRequest brokerRequest = instanceRequest.getQuery(); _tableNameWithType = brokerRequest.getQuerySource().getTableName(); _segmentsToQuery = instanceRequest.getSearchSegments(); - _enableTrace = instanceRequest.isEnableTrace(); - _brokerId = instanceRequest.getBrokerId() != null ? instanceRequest.getBrokerId() : "unknown"; _timerContext = new TimerContext(_tableNameWithType, serverMetrics, queryArrivalTimeMs); // Pre-compute segment independent information @@ -63,24 +73,59 @@ public class ServerQueryRequest { _allColumns = QueryContextUtils.getAllColumns(_queryContext); } - public long getRequestId() { - return _requestId; + public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serverMetrics) + throws Exception { + long queryArrivalTimeMs = System.currentTimeMillis(); + + Map<String, String> metadata = serverRequest.getMetadataMap(); + _requestId = Long.parseLong(metadata.getOrDefault(Request.MetadataKeys.REQUEST_ID, "0")); + _brokerId = metadata.getOrDefault(Request.MetadataKeys.BROKER_ID, "unknown"); + _enableTrace = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_TRACE)); + _enableStreaming = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING)); + + BrokerRequest brokerRequest; + String payloadType = metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, Request.PayloadType.SQL); + if (payloadType.equalsIgnoreCase(Request.PayloadType.SQL)) { + brokerRequest = SQL_COMPILER.compileToBrokerRequest(serverRequest.getSql()); + } else if (payloadType.equalsIgnoreCase(Request.PayloadType.BROKER_REQUEST)) { + brokerRequest = new BrokerRequest(); + new TDeserializer(new TCompactProtocol.Factory()) + .deserialize(brokerRequest, serverRequest.getPayload().toByteArray()); + } else { + throw new UnsupportedOperationException("Unsupported payloadType: " + payloadType); + } + + _tableNameWithType = brokerRequest.getQuerySource().getTableName(); + _segmentsToQuery = serverRequest.getSegmentsList(); + _timerContext = new TimerContext(_tableNameWithType, serverMetrics, queryArrivalTimeMs); + + // Pre-compute segment independent information + _queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + _allColumns = QueryContextUtils.getAllColumns(_queryContext); } - public String getTableNameWithType() { - return _tableNameWithType; + public long getRequestId() { + return _requestId; } - public List<String> getSegmentsToQuery() { - return _segmentsToQuery; + public String getBrokerId() { + return _brokerId; } public boolean isEnableTrace() { return _enableTrace; } - public String getBrokerId() { - return _brokerId; + public boolean isEnableStreaming() { + return _enableStreaming; + } + + public String getTableNameWithType() { + return _tableNameWithType; + } + + public List<String> getSegmentsToQuery() { + return _segmentsToQuery; } public TimerContext getTimerContext() { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java similarity index 54% rename from pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java rename to pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index e0fef72..7413070 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/NettyServerConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -16,28 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.conf; +package org.apache.pinot.core.transport.grpc; -import java.util.Optional; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import java.io.IOException; +import org.apache.pinot.core.query.executor.GrpcQueryExecutor; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.pinot.spi.env.PinotConfiguration; +public class GrpcQueryServer { + private final Server _server; -public class NettyServerConfig { - private static String NETTY_SERVER_PORT = "port"; - - private final int port; + public GrpcQueryServer(int port, GrpcQueryExecutor queryExecutor) { + _server = ServerBuilder.forPort(port).addService(queryExecutor).build(); + } - public NettyServerConfig(PinotConfiguration serverNettyConfig) throws ConfigurationException { - this.port = Optional.ofNullable(serverNettyConfig.getProperty(NETTY_SERVER_PORT, Integer.class)) - .orElseThrow(() -> new ConfigurationException("Cannot find Key : " + NETTY_SERVER_PORT)); + public void start() { + try { + _server.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - /** - * @return Netty server port - */ - public int getPort() { - return port; + public void shutdown() { + try { + _server.shutdown().awaitTermination(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java index 5ee3da2..e45b441 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java @@ -230,7 +230,7 @@ public class SelectionCombineOperatorTest { } CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, EXECUTOR, System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null); return combinePlanNode.run().nextBlock(); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java index b0d5d13..11053c7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/CombinePlanNodeTest.java @@ -58,7 +58,7 @@ public class CombinePlanNodeTest { } CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null); combinePlanNode.run(); Assert.assertEquals(numPlans, count.get()); } @@ -83,7 +83,7 @@ public class CombinePlanNodeTest { } CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + 100, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null); try { combinePlanNode.run(); } catch (RuntimeException e) { @@ -105,7 +105,7 @@ public class CombinePlanNodeTest { } CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS, - InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT, null); try { combinePlanNode.run(); } catch (RuntimeException e) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index c66a6f2..c1a3a85 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.integration.tests; +import org.testng.annotations.Test; + + /** * Integration test that extends OfflineClusterIntegrationTest but start multiple brokers and servers. */ @@ -34,4 +37,15 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg protected int getNumServers() { return NUM_SERVERS; } + + @Override + protected void startServers() { + startServers(NUM_SERVERS); + } + + @Test(enabled = false) + @Override + public void testGrpcQueryServer() { + // Ignored + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 7475715..790968a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -30,21 +30,31 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.core.common.datatable.DataTableFactory; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair; +import org.apache.pinot.pql.parsers.Pql2Compiler; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -130,7 +140,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet startZk(); startController(); startBrokers(getNumBrokers()); - startServers(getNumServers()); + startServers(); // Create and upload the schema and table config Schema schema = createSchema(); @@ -160,6 +170,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet waitForAllDocsLoaded(600_000L); } + protected void startServers() { + // Enable gRPC server + PinotConfiguration serverConfig = getDefaultServerConfiguration(); + serverConfig.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_GRPC_SERVER, true); + startServer(serverConfig); + } + private void registerCallbackHandlers() { List<String> instances = _helixAdmin.getInstancesInCluster(getHelixClusterName()); instances.removeIf(instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance @@ -973,15 +990,18 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testQueryWithSameAlias() throws Exception { //test repeated columns in selection query - String query = "SELECT ArrTime AS ArrTime, Carrier AS Carrier, DaysSinceEpoch AS DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC"; + String query = + "SELECT ArrTime AS ArrTime, Carrier AS Carrier, DaysSinceEpoch AS DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC"; testQuery(query, Collections.singletonList(query)); //test repeated columns in selection query - query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable ORDER BY Carrier DESC"; + query = + "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable ORDER BY Carrier DESC"; testQuery(query, Collections.singletonList(query)); //test repeated columns in selection query - query = "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC"; + query = + "SELECT ArrTime AS ArrTime, DaysSinceEpoch AS DaysSinceEpoch, Carrier AS Carrier FROM mytable ORDER BY Carrier DESC, ArrTime DESC"; testQuery(query, Collections.singletonList(query)); } @@ -1124,11 +1144,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet sql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, DestAirportID"; testSqlQuery(pql, Collections.singletonList(sql)); - pql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName LIMIT 1000000"; + pql = + "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName LIMIT 1000000"; sql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName"; testSqlQuery(pql, Collections.singletonList(sql)); - pql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName LIMIT 1000000"; + pql = + "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName LIMIT 1000000"; sql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName"; testSqlQuery(pql, Collections.singletonList(sql)); @@ -1160,7 +1182,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); List<String> queries = new ArrayList<>(); baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); - baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); + baseQueries + .forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"))); for (String query : queries) { try { @@ -1213,8 +1236,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable", "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')"); List<String> queries = new ArrayList<>(); - baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); - baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); + baseQueries + .forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); + baseQueries.forEach( + q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch"))); for (String query : queries) { try { @@ -1279,6 +1304,61 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet query = "SELECT c_o_u_n_t(FlightNum) FROM mytable "; assertEquals(postQuery(query).get("aggregationResults").get(0).get("value").asLong(), 115545); assertEquals(postSqlQuery(query, _brokerBaseApiUrl).get("resultTable").get("rows").get(0).get(0).asLong(), 115545); + } + + @Test + public void testGrpcQueryServer() + throws Exception { + GrpcQueryClient queryClient = new GrpcQueryClient("localhost", CommonConstants.Server.DEFAULT_GRPC_PORT); + String sql = "SELECT * FROM mytable_OFFLINE LIMIT 1000000"; + BrokerRequest brokerRequest = new Pql2Compiler().compileToBrokerRequest(sql); + List<String> segments = _helixResourceManager.getSegmentsFor("mytable_OFFLINE"); + + GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder().setSegments(segments); + testNonStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build())); + testNonStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build())); + + requestBuilder.setEnableStreaming(true); + testStreamingRequest(queryClient.submit(requestBuilder.setSql(sql).build())); + testStreamingRequest(queryClient.submit(requestBuilder.setBrokerRequest(brokerRequest).build())); + } + + private void testNonStreamingRequest(Iterator<Server.ServerResponse> nonStreamingResponses) + throws Exception { + int expectedNumDocs = (int) getCountStarResult(); + assertTrue(nonStreamingResponses.hasNext()); + Server.ServerResponse nonStreamingResponse = nonStreamingResponses.next(); + assertEquals(nonStreamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE), + CommonConstants.Query.Response.ResponseType.NON_STREAMING); + DataTable dataTable = DataTableFactory.getDataTable(nonStreamingResponse.getPayload().asReadOnlyByteBuffer()); + assertNotNull(dataTable.getDataSchema()); + assertEquals(dataTable.getNumberOfRows(), expectedNumDocs); + Map<String, String> metadata = dataTable.getMetadata(); + assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), Integer.toString(expectedNumDocs)); + } + private void testStreamingRequest(Iterator<Server.ServerResponse> streamingResponses) + throws Exception { + int expectedNumDocs = (int) getCountStarResult(); + int numTotalDocs = 0; + while (streamingResponses.hasNext()) { + Server.ServerResponse streamingResponse = streamingResponses.next(); + DataTable dataTable = DataTableFactory.getDataTable(streamingResponse.getPayload().asReadOnlyByteBuffer()); + String responseType = + streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE); + if (responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) { + assertTrue(dataTable.getMetadata().isEmpty()); + assertNotNull(dataTable.getDataSchema()); + numTotalDocs += dataTable.getNumberOfRows(); + } else { + assertEquals(responseType, CommonConstants.Query.Response.ResponseType.METADATA); + assertFalse(streamingResponses.hasNext()); + assertEquals(numTotalDocs, expectedNumDocs); + assertNull(dataTable.getDataSchema()); + assertEquals(dataTable.getNumberOfRows(), 0); + Map<String, String> metadata = dataTable.getMetadata(); + assertEquals(metadata.get(DataTable.NUM_DOCS_SCANNED_METADATA_KEY), Integer.toString(expectedNumDocs)); + } + } } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java index e7c9a97..1b5087f 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java @@ -20,9 +20,8 @@ package org.apache.pinot.server.conf; import java.util.Arrays; import java.util.List; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix; +import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.spi.env.PinotConfiguration; @@ -30,6 +29,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; * The config used for Server. */ public class ServerConf { + // TODO: Replace with constants in CommonConstants private static final String PINOT_ = "pinot."; private static final String PINOT_SERVER_INSTANCE = "pinot.server.instance"; private static final String PINOT_SERVER_METRICS = "pinot.server.metrics"; @@ -37,7 +37,6 @@ public class ServerConf { private static final String PINOT_SERVER_TABLE_LEVEL_METRICS = "pinot.server.enableTableLevelMetrics"; private static final String PINOT_SERVER_QUERY = "pinot.server.query.executor"; private static final String PINOT_SERVER_REQUEST = "pinot.server.request"; - private static final String PINOT_SERVER_NETTY = "pinot.server.netty"; private static final String PINOT_SERVER_INSTANCE_DATA_MANAGER_CLASS = "pinot.server.instance.data.manager.class"; private static final String PINOT_SERVER_QUERY_EXECUTOR_CLASS = "pinot.server.query.executor.class"; private static final String PINOT_SERVER_TRANSFORM_FUNCTIONS = "pinot.server.transforms"; @@ -70,9 +69,16 @@ public class ServerConf { return _serverConf.subset(PINOT_SERVER_METRICS); } - public NettyServerConfig getNettyConfig() - throws ConfigurationException { - return new NettyServerConfig(_serverConf.subset(PINOT_SERVER_NETTY)); + public int getNettyPort() { + return _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT, Helix.DEFAULT_SERVER_NETTY_PORT); + } + + public boolean isEnableGrpcServer() { + return _serverConf.getProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, Server.DEFAULT_ENABLE_GRPC_SERVER); + } + + public int getGrpcPort() { + return _serverConf.getProperty(Server.CONFIG_OF_GRPC_PORT, Server.DEFAULT_GRPC_PORT); } public PinotConfiguration getConfig(String component) { @@ -104,6 +110,6 @@ public class ServerConf { } public String getMetricsPrefix() { - return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX, CommonConstants.Server.DEFAULT_METRICS_PREFIX); + return _serverConf.getProperty(PINOT_SERVER_METRICS_PREFIX, Server.DEFAULT_METRICS_PREFIX); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index ae448ad..fa1903c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -30,11 +30,14 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory; +import org.apache.pinot.core.query.executor.GrpcQueryExecutor; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory; import org.apache.pinot.core.transport.QueryServer; +import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,8 @@ public class ServerInstance { private final QueryExecutor _queryExecutor; private final LongAccumulator _latestQueryTime; private final QueryScheduler _queryScheduler; - private final QueryServer _queryServer; + private final QueryServer _nettyQueryServer; + private final GrpcQueryServer _grpcQueryServer; private boolean _started = false; @@ -77,16 +81,27 @@ public class ServerInstance { String queryExecutorClassName = serverConf.getQueryExecutorClassName(); LOGGER.info("Initializing query executor of class: {}", queryExecutorClassName); _queryExecutor = (QueryExecutor) Class.forName(queryExecutorClassName).newInstance(); - _queryExecutor.init(serverConf.getQueryExecutorConfig(), _instanceDataManager, _serverMetrics); + PinotConfiguration queryExecutorConfig = serverConf.getQueryExecutorConfig(); + _queryExecutor.init(queryExecutorConfig, _instanceDataManager, _serverMetrics); LOGGER.info("Initializing query scheduler"); _latestQueryTime = new LongAccumulator(Long::max, 0); _queryScheduler = QuerySchedulerFactory.create(serverConf.getSchedulerConfig(), _queryExecutor, _serverMetrics, _latestQueryTime); - int queryServerPort = serverConf.getNettyConfig().getPort(); - LOGGER.info("Initializing query server on port: {}", queryServerPort); - _queryServer = new QueryServer(queryServerPort, _queryScheduler, _serverMetrics); + int nettyPort = serverConf.getNettyPort(); + LOGGER.info("Initializing Netty query server on port: {}", nettyPort); + _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, _serverMetrics); + + if (serverConf.isEnableGrpcServer()) { + int grpcPort = serverConf.getGrpcPort(); + LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); + GrpcQueryExecutor grpcQueryExecutor = + new GrpcQueryExecutor(queryExecutorConfig, _instanceDataManager, _serverMetrics); + _grpcQueryServer = new GrpcQueryServer(grpcPort, grpcQueryExecutor); + } else { + _grpcQueryServer = null; + } LOGGER.info("Initializing transform functions"); Set<Class<TransformFunction>> transformFunctionClasses = new HashSet<>(); @@ -119,8 +134,12 @@ public class ServerInstance { _queryExecutor.start(); LOGGER.info("Starting query scheduler"); _queryScheduler.start(); - LOGGER.info("Starting query server"); - _queryServer.start(); + LOGGER.info("Starting Netty query server"); + _nettyQueryServer.start(); + if (_grpcQueryServer != null) { + LOGGER.info("Starting gRPC query server"); + _grpcQueryServer.start(); + } _started = true; LOGGER.info("Finish starting server instance"); @@ -130,8 +149,12 @@ public class ServerInstance { Preconditions.checkState(_started, "Server instance is not running"); LOGGER.info("Shutting down server instance"); - LOGGER.info("Shutting down query server"); - _queryServer.shutDown(); + if (_grpcQueryServer != null) { + LOGGER.info("Shutting down gRPC query server"); + _grpcQueryServer.shutdown(); + } + LOGGER.info("Shutting down Netty query server"); + _nettyQueryServer.shutDown(); LOGGER.info("Shutting down query scheduler"); _queryScheduler.stop(); LOGGER.info("Shutting down query executor"); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java deleted file mode 100644 index bee45de..0000000 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/grpc/PinotQueryService.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.server.starter.grpc; - -public class PinotQueryService { - -} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java index abe2275..b419d83 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java @@ -18,54 +18,17 @@ */ package org.apache.pinot.server.starter.helix; -import static org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS; -import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS; -import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT; -import static org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME; -import static org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS; -import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST; -import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT; -import static org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE; -import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE; -import static org.apache.pinot.common.utils.CommonConstants.Server.ACCESS_CONTROL_FACTORY_CLASS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS; -import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS; - +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; - import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -85,9 +48,10 @@ import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.Instance; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel; +import org.apache.pinot.common.utils.CommonConstants.Server; import org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol; import org.apache.pinot.common.utils.NetUtil; import org.apache.pinot.common.utils.ServiceStatus; @@ -110,9 +74,6 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - /** * Starter for Pinot server. @@ -142,6 +103,7 @@ public class HelixServerStarter implements ServiceStartable { private final String _host; private final int _port; private final String _instanceId; + private final HelixConfigScope _instanceConfigScope; private HelixManager _helixManager; private HelixAdmin _helixAdmin; private ServerInstance _serverInstance; @@ -155,23 +117,21 @@ public class HelixServerStarter implements ServiceStartable { // Make a clone so that changes to the config won't propagate to the caller _serverConf = serverConf.clone(); - _host = _serverConf.getProperty(KEY_OF_SERVER_NETTY_HOST, - _serverConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtil - .getHostnameOrAddress() : NetUtil.getHostAddress()); - _port = _serverConf.getProperty(KEY_OF_SERVER_NETTY_PORT, DEFAULT_SERVER_NETTY_PORT); + _host = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST, + _serverConf.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtil.getHostnameOrAddress() + : NetUtil.getHostAddress()); + _port = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_PORT, Helix.DEFAULT_SERVER_NETTY_PORT); - _instanceId = Optional.ofNullable(_serverConf.getProperty(CONFIG_OF_INSTANCE_ID)) - - // InstanceId is not configured. Fallback to an auto generated config. - .orElseGet(this::initializeDefaultInstanceId); - } - - private String initializeDefaultInstanceId() { - String instanceId = PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port; - - _serverConf.addProperty(CONFIG_OF_INSTANCE_ID, instanceId); + String instanceId = _serverConf.getProperty(Server.CONFIG_OF_INSTANCE_ID); + if (instanceId == null) { + instanceId = Helix.PREFIX_OF_SERVER_INSTANCE + _host + "_" + _port; + _serverConf.addProperty(Server.CONFIG_OF_INSTANCE_ID, instanceId); + } + _instanceId = instanceId; - return instanceId; + _instanceConfigScope = + new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, _helixClusterName).forParticipant(_instanceId) + .build(); } /** @@ -179,9 +139,11 @@ public class HelixServerStarter implements ServiceStartable { */ private void registerServiceStatusHandler() { double minResourcePercentForStartup = _serverConf - .getProperty(CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START); - int realtimeConsumptionCatchupWaitMs = _serverConf.getProperty(CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS, - DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS); + .getProperty(Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START, + Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START); + int realtimeConsumptionCatchupWaitMs = _serverConf + .getProperty(Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS, + Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS); // collect all resources which have this instance in the ideal state List<String> resourcesToMonitor = new ArrayList<>(); @@ -235,25 +197,6 @@ public class HelixServerStarter implements ServiceStartable { new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build())); } - private void setAdminApiPort(int adminApiPort) { - Map<String, String> propToUpdate = new HashMap<>(); - propToUpdate.put(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort)); - updateInstanceConfigInHelix(propToUpdate); - } - - private void setShuttingDownStatus(boolean shuttingDownStatus) { - Map<String, String> propToUpdate = new HashMap<>(); - propToUpdate.put(IS_SHUTDOWN_IN_PROGRESS, String.valueOf(shuttingDownStatus)); - updateInstanceConfigInHelix(propToUpdate); - } - - private void updateInstanceConfigInHelix(Map<String, String> props) { - HelixConfigScope scope = - new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT, _helixClusterName).forParticipant(_instanceId) - .build(); - _helixAdmin.setConfig(scope, props); - } - private void updateInstanceConfigIfNeeded(String host, int port) { InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(_helixClusterName, _instanceId); boolean needToUpdateInstanceConfig = false; @@ -265,7 +208,7 @@ public class HelixServerStarter implements ServiceStartable { instanceConfig.addTag(TagNameUtils.getOfflineTagForTenant(null)); instanceConfig.addTag(TagNameUtils.getRealtimeTagForTenant(null)); } else { - instanceConfig.addTag(UNTAGGED_SERVER_INSTANCE); + instanceConfig.addTag(Helix.UNTAGGED_SERVER_INSTANCE); } needToUpdateInstanceConfig = true; } @@ -303,7 +246,7 @@ public class HelixServerStarter implements ServiceStartable { // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the // non-positive value, so set the default value as 1. System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, - _serverConf.getProperty(CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS, DEFAULT_FLAPPING_TIME_WINDOW_MS)); + _serverConf.getProperty(Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS, Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); } /** @@ -314,8 +257,8 @@ public class HelixServerStarter implements ServiceStartable { private void startupServiceStatusCheck(long endTimeMs) { LOGGER.info("Starting startup service status check"); long startTimeMs = System.currentTimeMillis(); - long checkIntervalMs = _serverConf - .getProperty(CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS); + long checkIntervalMs = _serverConf.getProperty(Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS, + Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS); while (System.currentTimeMillis() < endTimeMs) { Status serviceStatus = ServiceStatus.getServiceStatus(); @@ -370,7 +313,8 @@ public class HelixServerStarter implements ServiceStartable { _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); - SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics); + SegmentFetcherAndLoader fetcherAndLoader = + new SegmentFetcherAndLoader(_serverConf, instanceDataManager, serverMetrics); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader); _helixManager.getStateMachineEngine() @@ -386,7 +330,7 @@ public class HelixServerStarter implements ServiceStartable { // Start restlet server for admin API endpoint String accessControlFactoryClass = - _serverConf.getProperty(ACCESS_CONTROL_FACTORY_CLASS, DEFAULT_ACCESS_CONTROL_FACTORY_CLASS); + _serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS, Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS); LOGGER.info("Using class: {} as the AccessControlFactory", accessControlFactoryClass); final AccessControlFactory accessControlFactory; try { @@ -397,10 +341,20 @@ public class HelixServerStarter implements ServiceStartable { + "'", e); } - int adminApiPort = _serverConf.getProperty(CONFIG_OF_ADMIN_API_PORT, DEFAULT_ADMIN_API_PORT); + // Update admin API port + int adminApiPort = _serverConf.getProperty(Server.CONFIG_OF_ADMIN_API_PORT, Server.DEFAULT_ADMIN_API_PORT); _adminApiApplication = new AdminApiApplication(_serverInstance, accessControlFactory); _adminApiApplication.start(adminApiPort); - setAdminApiPort(adminApiPort); + _helixAdmin.setConfig(_instanceConfigScope, + Collections.singletonMap(Instance.ADMIN_PORT_KEY, String.valueOf(adminApiPort))); + + // Update gRPC port + if (serverInstanceConfig.isEnableGrpcServer()) { + _helixAdmin.setConfig(_instanceConfigScope, + Collections.singletonMap(Instance.GRPC_PORT_KEY, String.valueOf(serverInstanceConfig.getGrpcPort()))); + } else { + _helixAdmin.removeConfig(_instanceConfigScope, Collections.singletonList(Instance.GRPC_PORT_KEY)); + } // Register message handler factory SegmentMessageHandlerFactory messageHandlerFactory = @@ -408,19 +362,21 @@ public class HelixServerStarter implements ServiceStartable { _helixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory); - serverMetrics.addCallbackGauge(INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L); + serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L); _helixManager .addPreConnectCallback(() -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); // Register the service status handler registerServiceStatusHandler(); - if (_serverConf - .getProperty(CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) { - long endTimeMs = startTimeMs + _serverConf.getProperty(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS); + if (_serverConf.getProperty(Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK, + Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) { + long endTimeMs = + startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS, Server.DEFAULT_STARTUP_TIMEOUT_MS); startupServiceStatusCheck(endTimeMs); } - setShuttingDownStatus(false); + _helixAdmin.setConfig(_instanceConfigScope, + Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(false))); LOGGER.info("Pinot server ready"); // Create metrics for mmap stuff @@ -446,15 +402,19 @@ public class HelixServerStarter implements ServiceStartable { LOGGER.warn("Caught exception closing PinotFS classes", e); } _adminApiApplication.stop(); - setShuttingDownStatus(true); + _helixAdmin.setConfig(_instanceConfigScope, + Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(true))); - long endTimeMs = startTimeMs + _serverConf.getProperty(CONFIG_OF_SHUTDOWN_TIMEOUT_MS, DEFAULT_SHUTDOWN_TIMEOUT_MS); - if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) { + long endTimeMs = + startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, Server.DEFAULT_SHUTDOWN_TIMEOUT_MS); + if (_serverConf + .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK)) { shutdownQueryCheck(endTimeMs); } _helixManager.disconnect(); _serverInstance.shutDown(); - if (_serverConf.getProperty(CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) { + if (_serverConf + .getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK, Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK)) { shutdownResourceCheck(endTimeMs); } _realtimeLuceneIndexRefreshState.stop(); @@ -472,8 +432,9 @@ public class HelixServerStarter implements ServiceStartable { LOGGER.info("Starting shutdown query check"); long startTimeMs = System.currentTimeMillis(); - long maxQueryTimeMs = _serverConf.getProperty(CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); - long noQueryThresholdMs = _serverConf.getProperty(CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs); + long maxQueryTimeMs = + _serverConf.getProperty(Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); + long noQueryThresholdMs = _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS, maxQueryTimeMs); // Wait until no incoming queries boolean noIncomingQueries = false; @@ -553,8 +514,8 @@ public class HelixServerStarter implements ServiceStartable { } } - long checkIntervalMs = _serverConf - .getProperty(CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS); + long checkIntervalMs = _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS, + Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS); while (System.currentTimeMillis() < endTimeMs) { Iterator<String> iterator = resourcesToMonitor.iterator(); String currentResource = null; @@ -639,9 +600,9 @@ public class HelixServerStarter implements ServiceStartable { throws Exception { Map<String, Object> properties = new HashMap<>(); int port = 8003; - properties.put(KEY_OF_SERVER_NETTY_PORT, port); - properties.put(CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port + "/index"); - properties.put(CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" + port + "/segmentTar"); + properties.put(Helix.KEY_OF_SERVER_NETTY_PORT, port); + properties.put(Server.CONFIG_OF_INSTANCE_DATA_DIR, "/tmp/PinotServer/test" + port + "/index"); + properties.put(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, "/tmp/PinotServer/test" + port + "/segmentTar"); HelixServerStarter serverStarter = new HelixServerStarter("quickstart", "localhost:2191", new PinotConfiguration(properties)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org