This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new eda488f5d7 adding MultiStageEngineQuickStart (#8980) eda488f5d7 is described below commit eda488f5d7aeb68255c3452d00daacffbbbdd477 Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Fri Jul 1 19:59:07 2022 -0700 adding MultiStageEngineQuickStart (#8980) * adding MultiStageEngineQuickStart * address diff comments * add more queries Co-authored-by: Rong Rong <ro...@startree.ai> --- .../tests/MultiStageEngineIntegrationTest.java | 3 +- .../query/runtime/utils/ServerRequestUtils.java | 2 + .../apache/pinot/query/service/QueryConfig.java | 4 +- .../server/starter/helix/BaseServerStarter.java | 5 +- .../pinot/tools/MultistageEngineQuickStart.java | 129 +++++++++++++++++++++ .../tools/admin/command/PostQueryCommand.java | 18 ++- .../tools/admin/command/QuickstartRunner.java | 15 ++- .../tools/admin/command/StartBrokerCommand.java | 17 ++- .../tools/admin/command/StartServerCommand.java | 39 ++++++- .../admin/command/StartServiceManagerCommand.java | 7 +- .../apache/pinot/tools/utils/PinotConfigUtils.java | 12 +- .../baseballStats_offline_table_config.json | 2 +- .../baseballStats_offline_table_config.json | 2 +- 13 files changed, 236 insertions(+), 19 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 0d78d2b182..e5c92c1799 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -115,7 +115,8 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest return new Object[][] { new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1}, new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73}, - new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE WHERE CarrierDelay=15 AND ArrDelay>20", 10, 2}, + new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE" + + " WHERE CarrierDelay=15 AND ArrDelay>20", 172, 2}, new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin " + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146} }; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java index 15a8b0194c..738992fe52 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java @@ -46,6 +46,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils; * conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor. */ public class ServerRequestUtils { + private static final int DEFAULT_LEAF_NODE_LIMIT = 1_000_000; private ServerRequestUtils() { // do not instantiate. @@ -84,6 +85,7 @@ public class ServerRequestUtils { public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan) { PinotQuery pinotQuery = new PinotQuery(); + pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); pinotQuery.setExplain(false); walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery); return pinotQuery; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java index 8de432f3ea..c0bbb08174 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java @@ -23,13 +23,13 @@ package org.apache.pinot.query.service; */ public class QueryConfig { public static final String KEY_OF_QUERY_SERVER_PORT = "pinot.query.server.port"; - public static final int DEFAULT_QUERY_SERVER_PORT = -1; + public static final int DEFAULT_QUERY_SERVER_PORT = 0; public static final String KEY_OF_QUERY_RUNNER_HOSTNAME = "pinot.query.runner.hostname"; public static final String DEFAULT_QUERY_RUNNER_HOSTNAME = "localhost"; // query runner port is the mailbox port. public static final String KEY_OF_QUERY_RUNNER_PORT = "pinot.query.runner.port"; - public static final int DEFAULT_QUERY_RUNNER_PORT = -1; + public static final int DEFAULT_QUERY_RUNNER_PORT = 0; private QueryConfig() { // do not instantiate. diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 8f00c56bfb..c7dafd8a4e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -183,8 +183,9 @@ public abstract class BaseServerStarter implements ServiceStartable { int dataTableVersion = _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, Server.DEFAULT_CURRENT_DATA_TABLE_VERSION); if (dataTableVersion > Server.DEFAULT_CURRENT_DATA_TABLE_VERSION) { - throw new UnsupportedOperationException("Setting experimental DataTable version newer than default via config " - + "is not allowed. Current default DataTable version: " + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION); + LOGGER.warn("Setting experimental DataTable version newer than default via config could result in" + + " backward-compatibility issues. Current default DataTable version: " + + Server.DEFAULT_CURRENT_DATA_TABLE_VERSION); } DataTableFactory.setDataTableVersion(dataTableVersion); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java new file mode 100644 index 0000000000..c7179c6856 --- /dev/null +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tools; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.io.File; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.tools.admin.PinotAdministrator; +import org.apache.pinot.tools.admin.command.QuickstartRunner; + +import static org.apache.pinot.tools.Quickstart.prettyPrintResponse; + + +public class MultistageEngineQuickStart extends QuickStartBase { + + @Override + public List<String> types() { + return Collections.singletonList("MULTI_STAGE"); + } + + @Override + public Map<String, Object> getConfigOverrides() { + Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides()); + overrides.put("pinot.multistage.engine.enabled", "true"); + overrides.put("pinot.server.instance.currentDataTableVersion", 4); + return overrides; + } + + public void execute() + throws Exception { + File quickstartTmpDir = new File(_dataDir, String.valueOf(System.currentTimeMillis())); + + // Baseball stat table + File baseBallStatsBaseDir = new File(quickstartTmpDir, "baseballStats"); + File schemaFile = new File(baseBallStatsBaseDir, "baseballStats_schema.json"); + File tableConfigFile = new File(baseBallStatsBaseDir, "baseballStats_offline_table_config.json"); + File ingestionJobSpecFile = new File(baseBallStatsBaseDir, "ingestionJobSpec.yaml"); + ClassLoader classLoader = Quickstart.class.getClassLoader(); + URL resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_schema.json"); + Preconditions.checkNotNull(resource); + FileUtils.copyURLToFile(resource, schemaFile); + resource = classLoader.getResource("examples/batch/baseballStats/baseballStats_offline_table_config.json"); + Preconditions.checkNotNull(resource); + FileUtils.copyURLToFile(resource, tableConfigFile); + resource = classLoader.getResource("examples/batch/baseballStats/ingestionJobSpec.yaml"); + Preconditions.checkNotNull(resource); + FileUtils.copyURLToFile(resource, ingestionJobSpecFile); + QuickstartTableRequest request = new QuickstartTableRequest(baseBallStatsBaseDir.getAbsolutePath()); + + File tempDir = new File(quickstartTmpDir, "tmp"); + FileUtils.forceMkdir(tempDir); + QuickstartRunner runner = + new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 1, tempDir, getConfigOverrides()); + + printStatus(Quickstart.Color.CYAN, "***** Starting Zookeeper, controller, broker and server *****"); + runner.startAll(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + printStatus(Quickstart.Color.GREEN, "***** Shutting down offline quick start *****"); + runner.stop(); + FileUtils.deleteDirectory(quickstartTmpDir); + } catch (Exception e) { + e.printStackTrace(); + } + })); + printStatus(Quickstart.Color.CYAN, "***** Bootstrap baseballStats table *****"); + runner.bootstrapTable(); + + waitForBootstrapToComplete(null); + + Map<String, String> queryOptions = Collections.singletonMap( + CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE, "true"); + + printStatus(Quickstart.Color.YELLOW, "***** Multi-stage engine quickstart setup complete *****"); + String q1 = "SELECT count(*) FROM baseballStats_OFFLINE limit 1"; + printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table"); + printStatus(Quickstart.Color.CYAN, "Query : " + q1); + printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q1, queryOptions))); + printStatus(Quickstart.Color.GREEN, "***************************************************"); + + String q2 = "SELECT a.playerID, a.runs, a.yearID, b.runs, b.yearID" + + " FROM baseballStats_OFFLINE AS a JOIN baseballStats_OFFLINE AS b ON a.playerID = b.playerID" + + " WHERE a.runs > 160 AND b.runs < 2"; + printStatus(Quickstart.Color.YELLOW, "Correlate the same player(s) with more than 160-run some year(s) and" + + " with less than 2-run some other year(s)"); + printStatus(Quickstart.Color.CYAN, "Query : " + q2); + printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(runner.runQuery(q2, queryOptions))); + printStatus(Quickstart.Color.GREEN, "***************************************************"); + + printStatus(Quickstart.Color.GREEN, "***************************************************"); + printStatus(Quickstart.Color.YELLOW, "Example query run completed."); + printStatus(Quickstart.Color.GREEN, "***************************************************"); + printStatus(Quickstart.Color.YELLOW, "Please use broker port for executing multistage queries."); + printStatus(Quickstart.Color.GREEN, "***************************************************"); + } + + public static void main(String[] args) + throws Exception { + List<String> arguments = new ArrayList<>(); + arguments.addAll(Arrays.asList("QuickStart", "-type", "MULTI_STAGE")); + arguments.addAll(Arrays.asList(args)); + PinotAdministrator.main(arguments.toArray(new String[arguments.size()])); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java index f004b5f367..aad2e05430 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/PostQueryCommand.java @@ -18,7 +18,8 @@ */ package org.apache.pinot.tools.admin.command; -import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; @@ -62,6 +63,9 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman + "this message.") private boolean _help = false; + @CommandLine.Option(names = {"-o", "-option"}, required = false, description = "Additional options '-o key=value'") + private Map<String, String> _additionalOptions = new HashMap<>(); + private AuthProvider _authProvider; @Override @@ -124,6 +128,11 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman return this; } + public PostQueryCommand setAdditionalOptions(Map<String, String> additionalOptions) { + _additionalOptions.putAll(additionalOptions); + return this; + } + public String run() throws Exception { if (_brokerHost == null) { @@ -131,7 +140,12 @@ public class PostQueryCommand extends AbstractBaseAdminCommand implements Comman } LOGGER.info("Executing command: " + this); String url = _brokerProtocol + "://" + _brokerHost + ":" + _brokerPort + "/query/sql"; - String request = JsonUtils.objectToString(Collections.singletonMap(Request.SQL, _query)); + Map<String, String> payload = new HashMap<>(); + payload.put(Request.SQL, _query); + if (_additionalOptions != null) { + payload.putAll(_additionalOptions); + } + String request = JsonUtils.objectToString(payload); return sendRequest("POST", url, request, makeAuthHeaders(makeAuthProvider(_authProvider, _authTokenUrl, _authToken, _user, _password))); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index c656278d85..c1f7cf4b38 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -53,6 +54,10 @@ public class QuickstartRunner { private static final int DEFAULT_SERVER_GRPC_PORT = 7100; private static final int DEFAULT_MINION_PORT = 6000; + private static final int DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT = 8421; + private static final int DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT = 8442; + private static final int DEFAULT_SERVER_MULTISTAGE_SERVER_PORT = 8842; + private static final String DEFAULT_ZK_DIR = "PinotZkDir"; private static final String DEFAULT_CONTROLLER_DIR = "PinotControllerDir"; private static final String DEFAULT_SERVER_DATA_DIR = "PinotServerDataDir"; @@ -132,6 +137,7 @@ public class QuickstartRunner { for (int i = 0; i < _numBrokers; i++) { StartBrokerCommand brokerStarter = new StartBrokerCommand(); brokerStarter.setPort(DEFAULT_BROKER_PORT + i) + .setBrokerMultiStageRunnerPort(DEFAULT_BROKER_MULTISTAGE_RUNNER_PORT + i) .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME) .setConfigOverrides(_configOverrides); if (!brokerStarter.execute()) { @@ -147,6 +153,8 @@ public class QuickstartRunner { StartServerCommand serverStarter = new StartServerCommand(); serverStarter.setPort(DEFAULT_SERVER_NETTY_PORT + i).setAdminPort(DEFAULT_SERVER_ADMIN_API_PORT + i) .setGrpcPort(DEFAULT_SERVER_GRPC_PORT + i) + .setMultiStageServerPort(DEFAULT_SERVER_MULTISTAGE_SERVER_PORT + i) + .setMultiStageRunnerPort(DEFAULT_SERVER_MULTISTAGE_RUNNER_PORT + i) .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME) .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath()) .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath()) @@ -230,10 +238,15 @@ public class QuickstartRunner { public JsonNode runQuery(String query) throws Exception { + return runQuery(query, Collections.emptyMap()); + } + + public JsonNode runQuery(String query, Map<String, String> additionalOptions) + throws Exception { int brokerPort = _brokerPorts.get(RANDOM.nextInt(_brokerPorts.size())); return JsonUtils.stringToJsonNode( new PostQueryCommand().setBrokerPort(String.valueOf(brokerPort)).setAuthProvider(_authProvider) - .setQuery(query).run()); + .setAdditionalOptions(additionalOptions).setQuery(query).run()); } public static void registerDefaultPinotFS() { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java index dffd8e3bdf..e8e5ae7f68 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.tools.Command; @@ -51,6 +52,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm @CommandLine.Option(names = {"-brokerPort"}, required = false, description = "Broker port number to use for query.") private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT; + @CommandLine.Option(names = {"-brokerMultiStageRunnerPort"}, required = false, + description = "Broker port number to use for query.") + private int _brokerMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT; + @CommandLine.Option(names = {"-zkAddress"}, required = false, description = "HTTP address of Zookeeper.") private String _zkAddress = DEFAULT_ZK_ADDRESS; @@ -76,6 +81,10 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm return _brokerPort; } + public int getBrokerMultiStageRunnerPort() { + return _brokerMultiStageRunnerPort; + } + public String getZkAddress() { return _zkAddress; } @@ -125,6 +134,11 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm return this; } + public StartBrokerCommand setBrokerMultiStageRunnerPort(int brokerMultiStageRunnerPort) { + _brokerMultiStageRunnerPort = brokerMultiStageRunnerPort; + return this; + } + public StartBrokerCommand setZkAddress(String zkAddress) { _zkAddress = zkAddress; return this; @@ -171,7 +185,8 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm _zkAddress = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, _zkAddress); _clusterName = MapUtils.getString(properties, CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, _clusterName); } else { - properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort)); + properties.putAll(PinotConfigUtils.generateBrokerConf(_clusterName, _zkAddress, _brokerHost, _brokerPort, + _brokerMultiStageRunnerPort)); } if (_configOverrides != null) { properties.putAll(_configOverrides); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java index fc85408646..80b2cd70bd 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.ConfigurationException; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.tools.Command; @@ -59,6 +60,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm description = "Port number to serve the grpc query.") private int _serverGrpcPort = CommonConstants.Server.DEFAULT_GRPC_PORT; + @CommandLine.Option(names = {"-serverMultiStageServerPort"}, required = false, + description = "Port number to multi-stage query engine service entrypoint.") + private int _serverMultiStageServerPort = QueryConfig.DEFAULT_QUERY_SERVER_PORT; + + @CommandLine.Option(names = {"-serverMultiStageRunnerPort"}, required = false, + description = "Port number to multi-stage query engine runner communication.") + private int _serverMultiStageRunnerPort = QueryConfig.DEFAULT_QUERY_RUNNER_PORT; + @CommandLine.Option(names = {"-dataDir"}, required = false, description = "Path to directory containing data.") private String _dataDir = PinotConfigUtils.TMP_DIR + "data/pinotServerData"; @@ -100,6 +109,14 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm return _serverGrpcPort; } + public int getServerMultiStageServerPort() { + return _serverMultiStageServerPort; + } + + public int getServerMultiStageRunnerPort() { + return _serverMultiStageRunnerPort; + } + public String getDataDir() { return _dataDir; } @@ -149,6 +166,16 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm return this; } + public StartServerCommand setMultiStageServerPort(int multiStageServerPort) { + _serverMultiStageServerPort = multiStageServerPort; + return this; + } + + public StartServerCommand setMultiStageRunnerPort(int multiStageRunnerPort) { + _serverMultiStageRunnerPort = multiStageRunnerPort; + return this; + } + public StartServerCommand setDataDir(String dataDir) { _dataDir = dataDir; return this; @@ -173,11 +200,15 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm public String toString() { if (_configFileName != null) { return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort - + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -configFileName " - + _configFileName + " -zkAddress " + _zkAddress); + + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + + " -serverMultistageServerPort " + _serverMultiStageServerPort + + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -configFileName " + _configFileName + + " -zkAddress " + _zkAddress); } else { return ("StartServer -clusterName " + _clusterName + " -serverHost " + _serverHost + " -serverPort " + _serverPort - + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + " -dataDir " + _dataDir + + " -serverAdminPort " + _serverAdminPort + " -serverGrpcPort " + _serverGrpcPort + + " -serverMultistageServerPort " + _serverMultiStageServerPort + + " -serverMultistageRunnerPort " + _serverMultiStageRunnerPort + " -dataDir " + _dataDir + " -segmentDir " + _segmentDir + " -zkAddress " + _zkAddress); } } @@ -229,7 +260,7 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm } else { properties.putAll(PinotConfigUtils .generateServerConf(_clusterName, _zkAddress, _serverHost, _serverPort, _serverAdminPort, _serverGrpcPort, - _dataDir, _segmentDir)); + _serverMultiStageServerPort, _serverMultiStageRunnerPort, _dataDir, _segmentDir)); } if (_configOverrides != null) { properties.putAll(_configOverrides); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java index 1a5765faa8..b3274077ac 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServiceManagerCommand.java @@ -31,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.tools.Command; @@ -218,11 +219,13 @@ public class StartServiceManagerCommand extends AbstractBaseAdminCommand impleme ControllerConf.ControllerMode.DUAL, true); case BROKER: return PinotConfigUtils - .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT); + .generateBrokerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT, + QueryConfig.DEFAULT_QUERY_RUNNER_PORT); case SERVER: return PinotConfigUtils .generateServerConf(_clusterName, _zkAddress, null, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT, - CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT, null, null); + CommonConstants.Server.DEFAULT_ADMIN_API_PORT, CommonConstants.Server.DEFAULT_GRPC_PORT, + QueryConfig.DEFAULT_QUERY_SERVER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT, null, null); default: throw new RuntimeException("No default config found for service role: " + serviceRole); } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java index 046b2ca4e4..98b59dc1e7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/PinotConfigUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf; +import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; @@ -149,7 +150,7 @@ public class PinotConfigUtils { } public static Map<String, Object> generateBrokerConf(String clusterName, String zkAddress, String brokerHost, - int brokerPort) + int brokerPort, int brokerMultiStageRunnerPort) throws SocketException, UnknownHostException { Map<String, Object> properties = new HashMap<>(); properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, clusterName); @@ -157,11 +158,14 @@ public class PinotConfigUtils { properties.put(CommonConstants.Broker.CONFIG_OF_BROKER_HOSTNAME, !StringUtils.isEmpty(brokerHost) ? brokerHost : NetUtils.getHostAddress()); properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, brokerPort != 0 ? brokerPort : getAvailablePort()); + properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, brokerMultiStageRunnerPort != 0 + ? brokerMultiStageRunnerPort : getAvailablePort()); return properties; } public static Map<String, Object> generateServerConf(String clusterName, String zkAddress, String serverHost, - int serverPort, int serverAdminPort, int serverGrpcPort, String serverDataDir, String serverSegmentDir) + int serverPort, int serverAdminPort, int serverGrpcPort, int serverMultiStageServerPort, + int serverMultiStageRunnerPort, String serverDataDir, String serverSegmentDir) throws SocketException, UnknownHostException { if (serverHost == null) { serverHost = NetUtils.getHostAddress(); @@ -183,6 +187,10 @@ public class PinotConfigUtils { properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkAddress); properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST, serverHost); properties.put(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT, serverPort); + properties.put(QueryConfig.KEY_OF_QUERY_SERVER_PORT, serverMultiStageServerPort != 0 + ? serverMultiStageServerPort : getAvailablePort()); + properties.put(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, serverMultiStageRunnerPort != 0 + ? serverMultiStageRunnerPort : getAvailablePort()); properties.put(CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT, serverAdminPort); properties.put(CommonConstants.Server.CONFIG_OF_GRPC_PORT, serverGrpcPort); properties.put(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR, serverDataDir); diff --git a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json index 168c698105..09c86b127b 100644 --- a/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json +++ b/pinot-tools/src/main/resources/examples/batch/baseballStats/baseballStats_offline_table_config.json @@ -4,7 +4,7 @@ "segmentsConfig": { "segmentPushType": "APPEND", "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", - "schemaName": "baseball", + "schemaName": "baseballStats", "replication": "1" }, "tenants": { diff --git a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json index eadbf8641c..b7a079870d 100644 --- a/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json +++ b/pinot-tools/src/main/resources/examples/minions/batch/baseballStats/baseballStats_offline_table_config.json @@ -4,7 +4,7 @@ "segmentsConfig": { "segmentPushType": "APPEND", "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", - "schemaName": "baseball", + "schemaName": "baseballStats", "replication": "1" }, "tenants": { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org