This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch multi_stage_query_engine in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/multi_stage_query_engine by this push: new 82623d8f71 Merge Multistage Engine back to main (#8720) 82623d8f71 is described below commit 82623d8f71683045937a446c85e97edb994b9612 Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Jun 3 08:56:46 2022 -1000 Merge Multistage Engine back to main (#8720) * add e2e quickstart; all commits should squash into this until PR created list of work items: - modified the entrypoint in server/broker to run dispatcher and query-worker - modified the pinot-tools configs to start it properly * more merge back contents 1. added broker request handler delegate, that redirects to one of the 3 handlers 2. making multi-stage enigne service port and mailbox port separate Helix ZNode field 3. fix several bugs * revert changes to conf from quickstart demo. and fix tests * address diff comments * address diff comments Co-authored-by: Rong Rong <ro...@startree.ai> --- pinot-broker/pom.xml | 8 + .../broker/broker/helix/BaseBrokerStarter.java | 30 ++- .../requesthandler/BaseBrokerRequestHandler.java | 2 +- .../BrokerRequestHandlerDelegate.java | 84 +++++++++ .../requesthandler/GrpcBrokerRequestHandler.java | 4 + .../MultiStageBrokerRequestHandler.java | 206 +++++++++++++++++++++ .../SingleConnectionBrokerRequestHandler.java | 1 + .../pinot/common/utils/config/InstanceUtils.java | 12 ++ .../common/utils/config/InstanceUtilsTest.java | 26 ++- .../resources/PinotInstanceRestletResource.java | 28 +++ .../api/PinotInstanceRestletResourceTest.java | 15 +- .../PinotHelixResourceManagerStatelessTest.java | 5 +- .../helix/core/PinotHelixResourceManagerTest.java | 2 +- .../pinot/core/transport/ServerInstance.java | 17 ++ .../apache/pinot/query/routing/WorkerInstance.java | 15 +- .../apache/pinot/query/routing/WorkerManager.java | 9 +- .../pinot/query/QueryEnvironmentTestUtils.java | 4 +- .../runtime/operator/MailboxReceiveOperator.java | 2 +- .../runtime/operator/MailboxSendOperator.java | 2 +- .../runtime/plan/serde/QueryPlanSerDeUtils.java | 7 +- .../pinot/query/service/QueryDispatcher.java | 19 +- .../pinot/query/runtime/QueryRunnerTest.java | 6 +- .../pinot/query/service/QueryServerTest.java | 3 +- pinot-server/pom.xml | 8 + .../org/apache/pinot/server/conf/ServerConf.java | 5 + .../pinot/server/starter/ServerInstance.java | 65 ++++--- .../pinot/server/worker/WorkerQueryServer.java | 102 ++++++++++ .../apache/pinot/spi/config/instance/Instance.java | 23 +++ .../apache/pinot/spi/utils/CommonConstants.java | 12 ++ 29 files changed, 656 insertions(+), 66 deletions(-) diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml index d354cc7c28..f9bc5da067 100644 --- a/pinot-broker/pom.xml +++ b/pinot-broker/pom.xml @@ -57,6 +57,14 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-query-planner</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-query-runtime</artifactId> + </dependency> <!-- Jersey & Swagger --> <dependency> diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 45e918875e..351ce1ea7c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -41,7 +41,9 @@ import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.broker.BrokerAdminApiApplication; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; +import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate; import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler; +import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler; import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.Utils; @@ -251,24 +253,38 @@ public abstract class BaseBrokerStarter implements ServiceStartable { TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX); NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX); - if (_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE) - .equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { - LOGGER.info("Starting Grpc BrokerRequestHandler."); - _brokerRequestHandler = + // Create Broker request handler. + String brokerRequestHandlerType = + _brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE); + BrokerRequestHandler singleStageBrokerRequestHandler = null; + if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { + singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, null); } else { // default request handler type, e.g. netty - LOGGER.info("Starting Netty BrokerRequestHandler."); if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) { - _brokerRequestHandler = + singleStageBrokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults); } else { - _brokerRequestHandler = + singleStageBrokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null); } } + + MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null; + if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { + // multi-stage request handler uses both Netty and GRPC ports. + // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport. + // TODO: decouple protocol and engine selection. + multiStageBrokerRequestHandler = + new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, + tableCache, _brokerMetrics); + } + + _brokerRequestHandler = new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, + multiStageBrokerRequestHandler); _brokerRequestHandler.start(); String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL); if (controllerUrl != null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index bdb647570a..1d2dd7ead8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -1612,7 +1612,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { RequestContext requestContext) throws Exception; - private static void augmentStatistics(RequestContext statistics, BrokerResponse response) { + protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setTotalDocs(response.getTotalDocs()); statistics.setNumDocsScanned(response.getNumDocsScanned()); statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java new file mode 100644 index 0000000000..a39ad1d265 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import javax.annotation.Nullable; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * {@code BrokerRequestHandlerDelegate} delegates the inbound broker request to one of the enabled + * {@link BrokerRequestHandler} based on the requested handle type. + * + * {@see: @CommonConstant + */ +public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { + + private final BrokerRequestHandler _singleStageBrokerRequestHandler; + private final MultiStageBrokerRequestHandler _multiStageWorkerRequestHandler; + + private final boolean _isMultiStageQueryEngineEnabled; + + + public BrokerRequestHandlerDelegate( + BrokerRequestHandler singleStageBrokerRequestHandler, + @Nullable MultiStageBrokerRequestHandler multiStageWorkerRequestHandler + ) { + _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler; + _multiStageWorkerRequestHandler = multiStageWorkerRequestHandler; + _isMultiStageQueryEngineEnabled = _multiStageWorkerRequestHandler != null; + } + + @Override + public void start() { + if (_singleStageBrokerRequestHandler != null) { + _singleStageBrokerRequestHandler.start(); + } + if (_multiStageWorkerRequestHandler != null) { + _multiStageWorkerRequestHandler.start(); + } + } + + @Override + public void shutDown() { + if (_singleStageBrokerRequestHandler != null) { + _singleStageBrokerRequestHandler.shutDown(); + } + if (_multiStageWorkerRequestHandler != null) { + _multiStageWorkerRequestHandler.shutDown(); + } + } + + @Override + public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, + RequestContext requestContext) + throws Exception { + if (_isMultiStageQueryEngineEnabled && _multiStageWorkerRequestHandler != null) { + JsonNode node = request.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE); + if (node != null && node.asBoolean()) { + return _multiStageWorkerRequestHandler.handleRequest(request, requesterIdentity, requestContext); + } + } + return _singleStageBrokerRequestHandler.handleRequest(request, requesterIdentity, requestContext); + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 468add98c2..0bd33a9289 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -42,6 +42,8 @@ import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.RequestContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -49,6 +51,7 @@ import org.apache.pinot.spi.trace.RequestContext; */ @ThreadSafe public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class); private final GrpcQueryClient.Config _grpcConfig; private final StreamingReduceService _streamingReduceService; @@ -59,6 +62,7 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + LOGGER.info("Using Grpc BrokerRequestHandler."); _grpcConfig = buildGrpcQueryClientConfig(config); // create streaming query client diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java new file mode 100644 index 0000000000..e992031288 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.calcite.jdbc.CalciteSchemaBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.broker.api.RequesterIdentity; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.proto.Mailbox; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.query.selection.SelectionOperatorUtils; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.QueryEnvironment; +import org.apache.pinot.query.catalog.PinotCatalog; +import org.apache.pinot.query.mailbox.GrpcMailboxService; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.routing.WorkerManager; +import org.apache.pinot.query.service.QueryConfig; +import org.apache.pinot.query.service.QueryDispatcher; +import org.apache.pinot.query.type.TypeFactory; +import org.apache.pinot.query.type.TypeSystem; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class); + private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L; + private final String _reducerHostname; + private final int _reducerPort; + + private final MailboxService<Mailbox.MailboxContent> _mailboxService; + private QueryEnvironment _queryEnvironment; + private QueryDispatcher _queryDispatcher; + + public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + BrokerMetrics brokerMetrics) { + super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + LOGGER.info("Using Multi-stage BrokerRequestHandler."); + String reducerHostname = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME); + if (reducerHostname == null) { + // use broker ID as host name, but remove the + String brokerId = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID); + brokerId = brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? brokerId.substring( + CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId; + brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId; + reducerHostname = brokerId; + } + _reducerHostname = reducerHostname; + _reducerPort = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); + _queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), + CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)), + new WorkerManager(_reducerHostname, _reducerPort, routingManager)); + _queryDispatcher = new QueryDispatcher(); + _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort); + + // TODO: move this to a startUp() function. + _mailboxService.start(); + } + + @Override + public BrokerResponseNative handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity, + RequestContext requestContext) + throws Exception { + long requestId = _requestIdGenerator.incrementAndGet(); + requestContext.setBrokerId(_brokerId); + requestContext.setRequestId(requestId); + requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis()); + + // First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level + // check comes later. + boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity); + if (!hasAccess) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); + LOGGER.info("Access denied for requestId {}", requestId); + requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); + return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); + } + + JsonNode sql = request.get(CommonConstants.Broker.Request.SQL); + if (sql == null) { + throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); + } else { + return handleSQLRequest(requestId, request.get(CommonConstants.Broker.Request.SQL).asText(), request, + requesterIdentity, requestContext); + } + } + + private BrokerResponseNative handleSQLRequest(long requestId, String query, JsonNode request, + @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) + throws Exception { + LOGGER.debug("SQL query for request {}: {}", requestId, query); + requestContext.setQuery(query); + + // Compile the request + long compilationStartTimeNs = System.nanoTime(); + QueryPlan queryPlan; + try { + queryPlan = _queryEnvironment.planQuery(query); + } catch (Exception e) { + LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); + } + + List<DataTable> queryResults = null; + try { + queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, DEFAULT_TIMEOUT_NANO); + } catch (Exception e) { + LOGGER.info("query submission failed", e); + return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + } + + BrokerResponseNative brokerResponse = new BrokerResponseNative(); + long executionEndTimeNs = System.nanoTime(); + + // Set total query processing time + long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs); + brokerResponse.setTimeUsedMs(totalTimeMs); + brokerResponse.setResultTable(toResultTable(queryResults)); + requestContext.setQueryProcessingTime(totalTimeMs); + augmentStatistics(requestContext, brokerResponse); + return brokerResponse; + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, + @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, + RequestContext requestContext) + throws Exception { + throw new UnsupportedOperationException(); + } + + private ResultTable toResultTable(List<DataTable> queryResult) { + DataSchema resultDataSchema = null; + List<Object[]> resultRows = new ArrayList<>(); + for (DataTable dataTable : queryResult) { + resultDataSchema = resultDataSchema == null ? dataTable.getDataSchema() : resultDataSchema; + int numColumns = resultDataSchema.getColumnNames().length; + DataSchema.ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes(); + List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows()); + for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) { + Object[] row = new Object[numColumns]; + Object[] rawRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + for (int i = 0; i < numColumns; i++) { + row[i] = resultColumnDataTypes[i].convertAndFormat(rawRow[i]); + } + rows.add(row); + } + resultRows.addAll(rows); + } + return new ResultTable(resultDataSchema, resultRows); + } + + @Override + public void start() { + // no-op + } + + @Override + public void shutDown() { + _queryDispatcher.shutdown(); + _mailboxService.shutdown(); + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 2e399211cb..97a4b11e3f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -72,6 +72,7 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig) { super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + LOGGER.info("Using Netty BrokerRequestHandler."); _brokerReduceService = new BrokerReduceService(_config); _queryRouter = new QueryRouter(_brokerId, brokerMetrics, nettyConfig, tlsConfig); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java index 6977638ced..b18046381f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java @@ -91,6 +91,18 @@ public class InstanceUtils { } else { simpleFields.remove(Helix.Instance.ADMIN_PORT_KEY); } + int queryServicePort = instance.getQueryServicePort(); + if (queryServicePort > 0) { + simpleFields.put(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, Integer.toString(queryServicePort)); + } else { + simpleFields.remove(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY); + } + int queryMailboxPort = instance.getQueryMailboxPort(); + if (queryMailboxPort > 0) { + simpleFields.put(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, Integer.toString(queryMailboxPort)); + } else { + simpleFields.remove(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY); + } boolean queriesDisabled = instance.isQueriesDisabled(); if (queriesDisabled) { simpleFields.put(Helix.QUERIES_DISABLED, Boolean.toString(true)); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java index 09cc1821ae..b687c74cbf 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/InstanceUtilsTest.java @@ -39,7 +39,7 @@ public class InstanceUtilsTest { @Test public void testToHelixInstanceConfig() { - Instance instance = new Instance("localhost", 1234, InstanceType.CONTROLLER, null, null, 0, 0, false); + Instance instance = new Instance("localhost", 1234, InstanceType.CONTROLLER, null, null, 0, 0, 0, 0, false); InstanceConfig instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); assertEquals(instanceConfig.getInstanceName(), "Controller_localhost_1234"); assertTrue(instanceConfig.getInstanceEnabled()); @@ -50,10 +50,12 @@ public class InstanceUtilsTest { assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)); List<String> tags = Collections.singletonList("DefaultTenant_BROKER"); - instance = new Instance("localhost", 2345, InstanceType.BROKER, tags, null, 0, 0, false); + instance = new Instance("localhost", 2345, InstanceType.BROKER, tags, null, 0, 0, 0, 0, false); instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); assertEquals(instanceConfig.getInstanceName(), "Broker_localhost_2345"); assertTrue(instanceConfig.getInstanceEnabled()); @@ -64,13 +66,15 @@ public class InstanceUtilsTest { assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)); tags = Arrays.asList("T1_OFFLINE", "T2_REALTIME"); Map<String, Integer> poolMap = new TreeMap<>(); poolMap.put("T1_OFFLINE", 0); poolMap.put("T2_REALTIME", 1); - instance = new Instance("localhost", 3456, InstanceType.SERVER, tags, poolMap, 123, 234, true); + instance = new Instance("localhost", 3456, InstanceType.SERVER, tags, poolMap, 123, 234, 345, 456, true); instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); assertEquals(instanceConfig.getInstanceName(), "Server_localhost_3456"); assertTrue(instanceConfig.getInstanceEnabled()); @@ -84,10 +88,14 @@ public class InstanceUtilsTest { assertEquals(znRecord.getMapField(InstanceUtils.POOL_KEY), expectedPoolMap); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY), "123"); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY), "234"); + assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY), + "345"); + assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY), + "456"); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED), "true"); tags = Collections.singletonList("minion_untagged"); - instance = new Instance("localhost", 4567, InstanceType.MINION, tags, null, 0, 0, false); + instance = new Instance("localhost", 4567, InstanceType.MINION, tags, null, 0, 0, 0, 0, false); instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); assertEquals(instanceConfig.getInstanceName(), "Minion_localhost_4567"); assertTrue(instanceConfig.getInstanceEnabled()); @@ -98,6 +106,8 @@ public class InstanceUtilsTest { assertNull(znRecord.getMapField(InstanceUtils.POOL_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY)); + assertNull(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY)); assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)); } @@ -105,7 +115,7 @@ public class InstanceUtilsTest { public void testUpdateHelixInstanceConfig() { Instance instance = new Instance("localhost", 1234, InstanceType.SERVER, Collections.singletonList("DefaultTenant_OFFLINE"), null, - 0, 123, false); + 0, 123, 234, 345, false); InstanceConfig instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); // Put some custom fields, which should not be updated @@ -120,7 +130,7 @@ public class InstanceUtilsTest { Map<String, Integer> poolMap = new TreeMap<>(); poolMap.put("T1_OFFLINE", 0); poolMap.put("T2_REALTIME", 1); - instance = new Instance("myHost", 2345, InstanceType.SERVER, tags, poolMap, 123, 234, true); + instance = new Instance("myHost", 2345, InstanceType.SERVER, tags, poolMap, 123, 234, 345, 456, true); InstanceUtils.updateHelixInstanceConfig(instanceConfig, instance); // Instance name should not change @@ -136,6 +146,10 @@ public class InstanceUtilsTest { assertEquals(znRecord.getMapField(InstanceUtils.POOL_KEY), expectedPoolMap); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.GRPC_PORT_KEY), "123"); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY), "234"); + assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY), + "345"); + assertEquals(znRecord.getSimpleField(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY), + "456"); assertEquals(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED), "true"); // Custom fields should be preserved diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java index 6b4b10b933..87b3659bb5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java @@ -113,6 +113,8 @@ public class PinotInstanceRestletResource { response.set("pools", JsonUtils.objectToJsonNode(instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY))); response.put("grpcPort", getGrpcPort(instanceConfig)); response.put("adminPort", getAdminPort(instanceConfig)); + response.put("queryServicePort", getQueryServicePort(instanceConfig)); + response.put("queryMailboxPort", getQueryMailboxPort(instanceConfig)); String queriesDisabled = instanceConfig.getRecord().getSimpleField(CommonConstants.Helix.QUERIES_DISABLED); if ("true".equalsIgnoreCase(queriesDisabled)) { response.put(CommonConstants.Helix.QUERIES_DISABLED, "true"); @@ -145,6 +147,32 @@ public class PinotInstanceRestletResource { return Instance.NOT_SET_ADMIN_PORT_VALUE; } + private int getQueryServicePort(InstanceConfig instanceConfig) { + String queryServicePortStr = instanceConfig.getRecord().getSimpleField( + CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY); + if (queryServicePortStr != null) { + try { + return Integer.parseInt(queryServicePortStr); + } catch (Exception e) { + LOGGER.warn("Illegal service port: {} for instance: {}", queryServicePortStr, instanceConfig.getInstanceName()); + } + } + return Instance.NOT_SET_GRPC_PORT_VALUE; + } + + private int getQueryMailboxPort(InstanceConfig instanceConfig) { + String queryMailboxPortStr = instanceConfig.getRecord().getSimpleField( + CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY); + if (queryMailboxPortStr != null) { + try { + return Integer.parseInt(queryMailboxPortStr); + } catch (Exception e) { + LOGGER.warn("Illegal mailbox port: {} for instance: {}", queryMailboxPortStr, instanceConfig.getInstanceName()); + } + } + return Instance.NOT_SET_GRPC_PORT_VALUE; + } + private Map<String, String> getSystemResourceInfo(InstanceConfig instanceConfig) { return instanceConfig.getRecord().getMapField(CommonConstants.Helix.Instance.SYSTEM_RESOURCE_INFO_KEY); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java index fd37cd59d7..13ce05ead5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java @@ -84,10 +84,11 @@ public class PinotInstanceRestletResourceTest { // Create untagged broker and server instances String createInstanceUrl = TEST_INSTANCE.getControllerRequestURLBuilder().forInstanceCreate(); - Instance brokerInstance = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null, 0, 0, false); + Instance brokerInstance = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null, 0, 0, 0, 0, false); ControllerTest.sendPostRequest(createInstanceUrl, brokerInstance.toJsonString()); - Instance serverInstance = new Instance("1.2.3.4", 2345, InstanceType.SERVER, null, null, 8090, 8091, false); + Instance serverInstance = new Instance("1.2.3.4", 2345, InstanceType.SERVER, null, null, 8090, 8091, 8092, 8093, + false); ControllerTest.sendPostRequest(createInstanceUrl, serverInstance.toJsonString()); // Check that we have added two more instances @@ -95,7 +96,8 @@ public class PinotInstanceRestletResourceTest { // Create broker and server instances with tags and pools brokerInstance = - new Instance("2.3.4.5", 1234, InstanceType.BROKER, Collections.singletonList("tag_BROKER"), null, 0, 0, false); + new Instance("2.3.4.5", 1234, InstanceType.BROKER, Collections.singletonList("tag_BROKER"), null, 0, 0, 0, 0, + false); ControllerTest.sendPostRequest(createInstanceUrl, brokerInstance.toJsonString()); Map<String, Integer> serverPools = new TreeMap<>(); @@ -103,7 +105,7 @@ public class PinotInstanceRestletResourceTest { serverPools.put("tag_REALTIME", 1); serverInstance = new Instance("2.3.4.5", 2345, InstanceType.SERVER, Arrays.asList("tag_OFFLINE", "tag_REALTIME"), serverPools, - 18090, 18091, false); + 18090, 18091, 18092, 18093, false); ControllerTest.sendPostRequest(createInstanceUrl, serverInstance.toJsonString()); // Check that we have added four instances so far @@ -137,7 +139,8 @@ public class PinotInstanceRestletResourceTest { // Test PUT Instance API String newBrokerTag = "new-broker-tag"; Instance newBrokerInstance = - new Instance("1.2.3.4", 1234, InstanceType.BROKER, Collections.singletonList(newBrokerTag), null, 0, 0, false); + new Instance("1.2.3.4", 1234, InstanceType.BROKER, Collections.singletonList(newBrokerTag), null, 0, 0, 0, 0, + false); String brokerInstanceId = "Broker_1.2.3.4_1234"; String brokerInstanceUrl = TEST_INSTANCE.getControllerRequestURLBuilder().forInstance(brokerInstanceId); ControllerTest.sendPutRequest(brokerInstanceUrl, newBrokerInstance.toJsonString()); @@ -145,7 +148,7 @@ public class PinotInstanceRestletResourceTest { String newServerTag = "new-server-tag"; Instance newServerInstance = new Instance("1.2.3.4", 2345, InstanceType.SERVER, Collections.singletonList(newServerTag), null, 28090, 28091, - true); + 28092, 28093, true); String serverInstanceId = "Server_1.2.3.4_2345"; String serverInstanceUrl = TEST_INSTANCE.getControllerRequestURLBuilder().forInstance(serverInstanceId); ControllerTest.sendPutRequest(serverInstanceUrl, newServerInstance.toJsonString()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 88311095c0..b2bccec660 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -228,7 +228,7 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { String brokerToUntag = taggedBrokers.remove(ThreadLocalRandom.current().nextInt(3)); Instance instance = new Instance("localhost", brokerToUntag.charAt(brokerToUntag.length() - 1) - '0', InstanceType.BROKER, - Collections.singletonList(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE), null, 0, 0, false); + Collections.singletonList(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE), null, 0, 0, 0, 0, false); assertTrue(_helixResourceManager.updateInstance(brokerToUntag, instance, true).isSuccessful()); untaggedBrokers.add(brokerToUntag); checkBrokerResource(taggedBrokers); @@ -241,7 +241,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add a new broker instance Instance newBrokerInstance = - new Instance("localhost", 5, InstanceType.BROKER, Collections.singletonList(brokerTag), null, 0, 0, false); + new Instance("localhost", 5, InstanceType.BROKER, Collections.singletonList(brokerTag), null, 0, 0, 0, 0, + false); assertTrue(_helixResourceManager.addInstance(newBrokerInstance, true).isSuccessful()); String newBrokerId = InstanceUtils.getHelixInstanceId(newBrokerInstance); taggedBrokers.add(newBrokerId); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index b167a18256..455496bbee 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -206,7 +206,7 @@ public class PinotHelixResourceManagerTest { // Add new instance. Instance instance = new Instance("localhost", biggerRandomNumber, InstanceType.SERVER, - Collections.singletonList(UNTAGGED_SERVER_INSTANCE), null, 0, 0, false); + Collections.singletonList(UNTAGGED_SERVER_INSTANCE), null, 0, 0, 0, 0, false); TEST_INSTANCE.getHelixResourceManager().addInstance(instance, false); List<String> allInstances = TEST_INSTANCE.getHelixResourceManager().getAllInstances(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index b5b1049f49..149c1b0559 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -27,6 +27,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; public class ServerInstance { + public enum RoutingType { NETTY, GRPC, NETTY_TLS } @@ -40,6 +41,9 @@ public class ServerInstance { private final int _grpcPort; private final int _nettyTlsPort; + private final int _queryServicePort; + private final int _queryMailboxPort; + /** * By default (auto joined instances), server instance name is of format: {@code Server_<hostname>_<port>}, e.g. * {@code Server_localhost_12345}, hostname is of format: {@code Server_<hostname>}, e.g. {@code Server_localhost}. @@ -67,6 +71,10 @@ public class ServerInstance { } _grpcPort = instanceConfig.getRecord().getIntField(Helix.Instance.GRPC_PORT_KEY, INVALID_PORT); _nettyTlsPort = instanceConfig.getRecord().getIntField(Helix.Instance.NETTY_TLS_PORT_KEY, INVALID_PORT); + _queryServicePort = instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, + INVALID_PORT); + _queryMailboxPort = instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, + INVALID_PORT); } @VisibleForTesting @@ -76,6 +84,8 @@ public class ServerInstance { _port = port; _grpcPort = INVALID_PORT; _nettyTlsPort = INVALID_PORT; + _queryServicePort = INVALID_PORT; + _queryMailboxPort = INVALID_PORT; } public String getInstanceId() { @@ -93,6 +103,13 @@ public class ServerInstance { public int getGrpcPort() { return _grpcPort; } + public int getQueryServicePort() { + return _queryServicePort; + } + + public int getQueryMailboxPort() { + return _queryMailboxPort; + } public int getNettyTlsPort() { return _nettyTlsPort; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java index c7e6bcfbd5..6bab8a2b83 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerInstance.java @@ -36,16 +36,21 @@ public class WorkerInstance extends ServerInstance { super(instanceConfig); } - public WorkerInstance(String hostname, int serverPort, int mailboxPort) { - super(toInstanceConfig(hostname, serverPort, mailboxPort)); + public WorkerInstance(String hostname, int nettyPort, int grpcPort, int servicePort, int mailboxPort) { + super(toInstanceConfig(hostname, nettyPort, grpcPort, servicePort, mailboxPort)); } - private static InstanceConfig toInstanceConfig(String hostname, int serverPort, int mailboxPort) { - String server = String.format("%s%s_%d", CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, serverPort); + private static InstanceConfig toInstanceConfig(String hostname, int nettyPort, int grpcPort, int servicePort, + int mailboxPort) { + String server = String.format("%s%s_%d", CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE, hostname, nettyPort); InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(server); ZNRecord znRecord = instanceConfig.getRecord(); Map<String, String> simpleFields = znRecord.getSimpleFields(); - simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, String.valueOf(mailboxPort)); + simpleFields.put(CommonConstants.Helix.Instance.GRPC_PORT_KEY, String.valueOf(grpcPort)); + simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, + String.valueOf(servicePort)); + simpleFields.put(CommonConstants.Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, + String.valueOf(mailboxPort)); return instanceConfig; } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 74e2036e77..aa19d3bf63 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -66,7 +66,7 @@ public class WorkerManager { } else if (PlannerUtils.isRootStage(stageId)) { // ROOT stage doesn't have a QueryServer as it is strictly only reducing results. // here we simply assign the worker instance with identical server/mailbox port number. - stageMetadata.setServerInstances(Lists.newArrayList(new WorkerInstance(_hostName, _port, _port))); + stageMetadata.setServerInstances(Lists.newArrayList(new WorkerInstance(_hostName, _port, _port, _port, _port))); } else { stageMetadata.setServerInstances(filterServers(_routingManager.getEnabledServerInstanceMap().values())); } @@ -76,9 +76,10 @@ public class WorkerManager { List<ServerInstance> serverInstances = new ArrayList<>(); for (ServerInstance server : servers) { String hostname = server.getHostname(); - if (!hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !hostname.startsWith( - CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) && !hostname.startsWith( - CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE) && server.getGrpcPort() > 0) { + if (server.getQueryServicePort() > 0 && server.getQueryMailboxPort() > 0 + && !hostname.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) + && !hostname.startsWith(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE) + && !hostname.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE)) { serverInstances.add(server); } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java index 7585f9bd7c..0993f892f1 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java @@ -90,8 +90,8 @@ public class QueryEnvironmentTestUtils { String server2 = String.format("localhost_%d", port2); // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. // this is only use for test identifier purpose. - ServerInstance host1 = new WorkerInstance("localhost", port1, port1); - ServerInstance host2 = new WorkerInstance("localhost", port2, port2); + ServerInstance host1 = new WorkerInstance("localhost", port1, port1, port1, port1); + ServerInstance host2 = new WorkerInstance("localhost", port2, port2, port2, port2); RoutingTable rtA = mock(RoutingTable.class); when(rtA.getServerInstanceToSegmentsMap()).thenReturn( diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index b84d8f40f0..37a203f815 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -125,6 +125,6 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { private String toMailboxId(ServerInstance serverInstance) { return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), serverInstance.getHostname(), - serverInstance.getGrpcPort(), _hostName, _port).toString(); + serverInstance.getQueryMailboxPort(), _hostName, _port).toString(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 8629663b60..3681acdc7d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -215,6 +215,6 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> { private String toMailboxId(ServerInstance serverInstance) { return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), _serverHostName, _serverPort, - serverInstance.getHostname(), serverInstance.getGrpcPort()).toString(); + serverInstance.getHostname(), serverInstance.getQueryMailboxPort()).toString(); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java index 034bf561a2..49c2d62dcb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java @@ -59,12 +59,15 @@ public class QueryPlanSerDeUtils { public static ServerInstance stringToInstance(String serverInstanceString) { String[] s = StringUtils.split(serverInstanceString, '_'); - return new WorkerInstance(s[0], Integer.parseInt(s[1]), Integer.parseInt(s[2])); + // Skipped netty and grpc port as they are not used in worker instance. + return new WorkerInstance(s[0], Integer.parseInt(s[1]), Integer.parseInt(s[2]), Integer.parseInt(s[3]), + Integer.parseInt(s[4])); } public static String instanceToString(ServerInstance serverInstance) { return StringUtils.join(serverInstance.getHostname(), '_', serverInstance.getPort(), '_', - serverInstance.getGrpcPort()); + serverInstance.getGrpcPort(), '_', serverInstance.getQueryServicePort(), '_', + serverInstance.getQueryMailboxPort()); } public static Map<Integer, StageMetadata> protoMapToStageMetadataMap(Map<Integer, Worker.StageMetadata> protoMap) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index fa9f04808c..260c083ea7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.service; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.ArrayList; import java.util.List; @@ -80,14 +81,15 @@ public class QueryDispatcher { List<ServerInstance> serverInstances = stage.getValue().getServerInstances(); for (ServerInstance serverInstance : serverInstances) { String host = serverInstance.getHostname(); - int port = serverInstance.getPort(); - DispatchClient client = getOrCreateDispatchClient(host, port); + int servicePort = serverInstance.getQueryServicePort(); + int mailboxPort = serverInstance.getQueryMailboxPort(); + DispatchClient client = getOrCreateDispatchClient(host, servicePort); Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder() .setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) .putMetadata("REQUEST_ID", String.valueOf(requestId)) .putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname()) - .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(serverInstance.getGrpcPort())).build()); + .putMetadata("SERVER_INSTANCE_PORT", String.valueOf(mailboxPort)).build()); if (response.containsMetadata("ERROR")) { throw new RuntimeException( String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", stageId, @@ -134,12 +136,21 @@ public class QueryDispatcher { return mailboxReceiveOperator; } + public void shutdown() { + for (DispatchClient dispatchClient : _dispatchClientMap.values()) { + dispatchClient._managedChannel.shutdown(); + } + _dispatchClientMap.clear(); + } + public static class DispatchClient { private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub _blockingStub; + private final ManagedChannel _managedChannel; public DispatchClient(String host, int port) { ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(); - _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(managedChannelBuilder.build()); + _managedChannel = managedChannelBuilder.build(); + _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(_managedChannel); } public Worker.QueryResponse submit(Worker.QueryRequest request) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index d133584797..f76917b766 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -87,8 +87,10 @@ public class QueryRunnerTest { server2.start(); // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. // this is only use for test identifier purpose. - _servers.put(new WorkerInstance("localhost", server1.getPort(), server1.getPort()), server1); - _servers.put(new WorkerInstance("localhost", server2.getPort(), server2.getPort()), server2); + int port1 = server1.getPort(); + int port2 = server2.getPort(); + _servers.put(new WorkerInstance("localhost", port1, port1, port1, port1), server1); + _servers.put(new WorkerInstance("localhost", port2, port2, port2, port2), server2); } @AfterClass diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index 735c54dd0c..245f78e651 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -70,7 +70,8 @@ public class QueryServerTest { _queryRunnerMap.put(availablePort, queryRunner); // this only test the QueryServer functionality so the server port can be the same as the mailbox port. // this is only use for test identifier purpose. - _queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort)); + _queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort, + availablePort, availablePort)); } List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet()); diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml index d1ab3c83c6..4408b3864d 100644 --- a/pinot-server/pom.xml +++ b/pinot-server/pom.xml @@ -47,6 +47,14 @@ <groupId>org.apache.pinot</groupId> <artifactId>pinot-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-query-planner</artifactId> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-query-runtime</artifactId> + </dependency> <dependency> <groupId>org.apache.pinot</groupId> <artifactId>pinot-core</artifactId> diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java index d630013419..a1f8e910af 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java @@ -96,6 +96,11 @@ public class ServerConf { return _serverConf.getProperty(Server.CONFIG_OF_GRPCTLS_SERVER_ENABLED, Server.DEFAULT_GRPCTLS_SERVER_ENABLED); } + public boolean isMultiStageServerEnabled() { + return _serverConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, + Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED); + } + public boolean isEnableSwagger() { return _serverConf.getProperty(CONFIG_OF_SWAGGER_SERVER_ENABLED, DEFAULT_SWAGGER_SERVER_ENABLED); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 7041a40b24..4103950621 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -39,6 +39,7 @@ import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.worker.WorkerQueryServer; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; @@ -63,6 +64,8 @@ public class ServerInstance { private final GrpcQueryServer _grpcQueryServer; private final AccessControl _accessControl; + private final WorkerQueryServer _workerQueryServer; + private boolean _started = false; public ServerInstance(ServerConf serverConf, HelixManager helixManager, AccessControlFactory accessControlFactory) @@ -102,30 +105,42 @@ public class ServerInstance { serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL), helixManager); _accessControl = accessControlFactory.create(); - if (serverConf.isNettyServerEnabled()) { - int nettyPort = serverConf.getNettyPort(); - LOGGER.info("Initializing Netty query server on port: {}", nettyPort); - _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, _serverMetrics, nettyConfig); - } else { + if (serverConf.isMultiStageServerEnabled()) { + // WorkerQueryServer initialization + // because worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport, + // we can't enable any of the other 2 servers + // TODO: decouple server protocol and engine type. + _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(), _instanceDataManager, _serverMetrics); _nettyQueryServer = null; - } - - if (serverConf.isNettyTlsServerEnabled()) { - int nettySecPort = serverConf.getNettyTlsPort(); - LOGGER.info("Initializing TLS-secured Netty query server on port: {}", nettySecPort); - _nettyTlsQueryServer = - new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, nettyConfig, tlsConfig, _accessControl); - } else { _nettyTlsQueryServer = null; - } - if (serverConf.isEnableGrpcServer()) { - int grpcPort = serverConf.getGrpcPort(); - LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); - _grpcQueryServer = new GrpcQueryServer(grpcPort, - serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), - CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); - } else { _grpcQueryServer = null; + } else { + _workerQueryServer = null; + if (serverConf.isNettyServerEnabled()) { + int nettyPort = serverConf.getNettyPort(); + LOGGER.info("Initializing Netty query server on port: {}", nettyPort); + _nettyQueryServer = new QueryServer(nettyPort, _queryScheduler, _serverMetrics, nettyConfig); + } else { + _nettyQueryServer = null; + } + + if (serverConf.isNettyTlsServerEnabled()) { + int nettySecPort = serverConf.getNettyTlsPort(); + LOGGER.info("Initializing TLS-secured Netty query server on port: {}", nettySecPort); + _nettyTlsQueryServer = + new QueryServer(nettySecPort, _queryScheduler, _serverMetrics, nettyConfig, tlsConfig, _accessControl); + } else { + _nettyTlsQueryServer = null; + } + if (serverConf.isEnableGrpcServer()) { + int grpcPort = serverConf.getGrpcPort(); + LOGGER.info("Initializing gRPC query server on port: {}", grpcPort); + _grpcQueryServer = new GrpcQueryServer(grpcPort, + serverConf.isGrpcTlsServerEnabled() ? TlsUtils.extractTlsConfig(serverConf.getPinotConfig(), + CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null, _queryExecutor, _serverMetrics, _accessControl); + } else { + _grpcQueryServer = null; + } } LOGGER.info("Initializing transform functions"); @@ -171,6 +186,10 @@ public class ServerInstance { LOGGER.info("Starting gRPC query server"); _grpcQueryServer.start(); } + if (_workerQueryServer != null) { + LOGGER.info("Starting worker query server"); + _workerQueryServer.start(); + } _started = true; LOGGER.info("Finish starting server instance"); @@ -196,6 +215,10 @@ public class ServerInstance { LOGGER.info("Shutting down Netty query server"); _nettyQueryServer.shutDown(); } + if (_workerQueryServer != null) { + LOGGER.info("Shutting down worker query server"); + _workerQueryServer.shutDown(); + } LOGGER.info("Shutting down query scheduler"); _queryScheduler.stop(); LOGGER.info("Shutting down query executor"); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java new file mode 100644 index 0000000000..711c763c38 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.worker; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.query.runtime.QueryRunner; +import org.apache.pinot.query.service.QueryConfig; +import org.apache.pinot.query.service.QueryServer; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.NetUtils; + + +public class WorkerQueryServer { + private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5; + + private final ExecutorService _executor; + private final int _queryServicePort; + private final PinotConfiguration _configuration; + + private QueryServer _queryWorkerService; + private QueryRunner _queryRunner; + private InstanceDataManager _instanceDataManager; + private ServerMetrics _serverMetrics; + + public WorkerQueryServer(PinotConfiguration configuration, InstanceDataManager instanceDataManager, + ServerMetrics serverMetrics) { + _configuration = toWorkerQueryConfig(configuration); + _instanceDataManager = instanceDataManager; + _serverMetrics = serverMetrics; + _queryServicePort = + _configuration.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT); + _queryRunner = new QueryRunner(); + _queryRunner.init(_configuration, _instanceDataManager, _serverMetrics); + _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner); + _executor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM, + new NamedThreadFactory("worker_query_server_enclosure_on_" + _queryServicePort + "_port")); + } + + private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration configuration) { + PinotConfiguration newConfig = new PinotConfiguration(configuration.toMap()); + String hostname = newConfig.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME); + if (hostname == null) { + String instanceId = + newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, NetUtils.getHostnameOrAddress()); + hostname = instanceId.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceId.substring( + CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId; + newConfig.addProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, hostname); + } + int runnerPort = newConfig.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); + if (runnerPort == -1) { + runnerPort = + newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT); + newConfig.addProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, runnerPort); + } + int servicePort = + newConfig.getProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_SERVER_PORT); + if (servicePort == -1) { + servicePort = newConfig.getProperty(CommonConstants.Server.CONFIG_OF_NETTY_PORT, + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT); + newConfig.addProperty(QueryConfig.KEY_OF_QUERY_SERVER_PORT, servicePort); + } + return newConfig; + } + + public int getPort() { + return _queryServicePort; + } + + public void start() { + try { + _queryWorkerService.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void shutDown() { + _queryWorkerService.shutdown(); + _executor.shutdown(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java index 6e0b502632..a5e5109eef 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/Instance.java @@ -46,6 +46,8 @@ import org.apache.pinot.spi.config.BaseJsonConfig; public class Instance extends BaseJsonConfig { public static final int NOT_SET_GRPC_PORT_VALUE = -1; public static final int NOT_SET_ADMIN_PORT_VALUE = -1; + public static final int NOT_SET_QUERY_SERVER_PORT_VALUE = -1; + public static final int NOT_SET_QUERY_MAILBOX_PORT_VALUE = -1; private final String _host; private final int _port; @@ -54,6 +56,8 @@ public class Instance extends BaseJsonConfig { private final Map<String, Integer> _pools; private final int _grpcPort; private final int _adminPort; + private final int _queryServicePort; + private final int _queryMailboxPort; private final boolean _queriesDisabled; @JsonCreator @@ -62,6 +66,7 @@ public class Instance extends BaseJsonConfig { @JsonProperty(value = "type", required = true) InstanceType type, @JsonProperty("tags") @Nullable List<String> tags, @JsonProperty("pools") @Nullable Map<String, Integer> pools, @JsonProperty("grpcPort") int grpcPort, @JsonProperty("adminPort") int adminPort, + @JsonProperty("queryServicePort") int queryServicePort, @JsonProperty("queryMailboxPort") int queryMailboxPort, @JsonProperty("queriesDisabled") boolean queriesDisabled) { Preconditions.checkArgument(host != null, "'host' must be configured"); Preconditions.checkArgument(type != null, "'type' must be configured"); @@ -80,6 +85,16 @@ public class Instance extends BaseJsonConfig { } else { _adminPort = adminPort; } + if (queryServicePort == 0) { + _queryServicePort = NOT_SET_QUERY_SERVER_PORT_VALUE; + } else { + _queryServicePort = queryServicePort; + } + if (queryMailboxPort == 0) { + _queryMailboxPort = NOT_SET_QUERY_MAILBOX_PORT_VALUE; + } else { + _queryMailboxPort = queryMailboxPort; + } _queriesDisabled = queriesDisabled; } @@ -113,6 +128,14 @@ public class Instance extends BaseJsonConfig { return _adminPort; } + public int getQueryServicePort() { + return _queryServicePort; + } + + public int getQueryMailboxPort() { + return _queryMailboxPort; + } + public boolean isQueriesDisabled() { return _queriesDisabled; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index d2e6a63f8d..b150a53ad5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -138,6 +138,10 @@ public class CommonConstants { public static final String ADMIN_HTTPS_PORT_KEY = "adminHttpsPort"; public static final String GRPC_PORT_KEY = "grpcPort"; public static final String NETTY_TLS_PORT_KEY = "nettyTlsPort"; + + public static final String MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY = "queryServerPort"; + public static final String MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY = "queryMailboxPort"; + public static final String SYSTEM_RESOURCE_INFO_KEY = "SYSTEM_RESOURCE_INFO"; } @@ -172,6 +176,9 @@ public class CommonConstants { public static final String CONFIG_OF_PINOT_BROKER_STARTABLE_CLASS = "pinot.broker.startable.class"; public static final String CONFIG_OF_PINOT_SERVER_STARTABLE_CLASS = "pinot.server.startable.class"; public static final String CONFIG_OF_PINOT_MINION_STARTABLE_CLASS = "pinot.minion.startable.class"; + + public static final String CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED = "pinot.multistage.engine.enabled"; + public static final boolean DEFAULT_MULTI_STAGE_ENGINE_ENABLED = false; } public static class Broker { @@ -218,11 +225,15 @@ public class CommonConstants { public static final String CONFIG_OF_BROKER_GROUPBY_TRIM_THRESHOLD = "pinot.broker.groupby.trim.threshold"; public static final int DEFAULT_BROKER_GROUPBY_TRIM_THRESHOLD = 1_000_000; + // Configure the request handler type used by broker to handler inbound query request. + // NOTE: the request handler type refers to the communication between Broker and Server. public static final String BROKER_REQUEST_HANDLER_TYPE = "pinot.broker.request.handler.type"; public static final String NETTY_BROKER_REQUEST_HANDLER_TYPE = "netty"; public static final String GRPC_BROKER_REQUEST_HANDLER_TYPE = "grpc"; + public static final String MULTI_STAGE_BROKER_REQUEST_HANDLER_TYPE = "multistage"; public static final String DEFAULT_BROKER_REQUEST_HANDLER_TYPE = NETTY_BROKER_REQUEST_HANDLER_TYPE; + public static final String BROKER_TLS_PREFIX = "pinot.broker.tls"; public static final String BROKER_NETTY_PREFIX = "pinot.broker.netty"; public static final String BROKER_NETTYTLS_ENABLED = "pinot.broker.nettytls.enabled"; @@ -253,6 +264,7 @@ public class CommonConstants { public static final String MIN_SERVER_GROUP_TRIM_SIZE = "minServerGroupTrimSize"; public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose"; + public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine"; // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0 @Deprecated --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org