This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 733cc2fa6b [multistage] Use Different Thread Pool for QueryServer (#10349) 733cc2fa6b is described below commit 733cc2fa6ba7ec4fc022a4a1d951028b2156e43d Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Wed Mar 1 01:18:32 2023 +0530 [multistage] Use Different Thread Pool for QueryServer (#10349) --- .../apache/pinot/query/runtime/QueryRunner.java | 43 +++++++++++++++++++--- .../apache/pinot/query/service/QueryServer.java | 7 ++-- .../pinot/query/service/QueryDispatcherTest.java | 9 +++-- .../pinot/query/service/QueryServerTest.java | 9 +++-- 4 files changed, 53 insertions(+), 15 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 3846e4d424..4c7380943e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.combine.BaseCombineOperator; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; @@ -84,7 +85,32 @@ public class QueryRunner { private String _hostname; private int _port; private VirtualServerAddress _rootServer; - private ExecutorService _executorService; + // Query worker threads are used for (1) running intermediate stage operators (2) running segment level operators + /** + * Query worker threads are used for: + * <ol> + * <li> + * Running intermediate stage operators (v2 engine operators). + * </li> + * <li> + * Running per-segment operators submitted in {@link BaseCombineOperator}. + * </li> + * </ol> + */ + private ExecutorService _queryWorkerExecutorService; + /** + * Query runner threads are used for: + * <ol> + * <li> + * Merging results in BaseCombineOperator for leaf stages. Results are provided by per-segment operators run in + * worker threads + * </li> + * <li> + * Building the OperatorChain and submitting to the scheduler for non-leaf stages (intermediate stages). + * </li> + * </ol> + */ + private ExecutorService _queryRunnerExecutorService; private OpChainSchedulerService _scheduler; /** @@ -103,10 +129,13 @@ public class QueryRunner { try { long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS); - _executorService = Executors.newFixedThreadPool( + _queryWorkerExecutorService = Executors.newFixedThreadPool( ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_worker_on_" + _port + "_port")); - _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _executorService); + _queryRunnerExecutorService = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, + new NamedThreadFactory("query_runner_on_" + _port + "_port")); + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _queryWorkerExecutorService); _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config, _scheduler::onDataAvailable); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); @@ -173,8 +202,12 @@ public class QueryRunner { } } - public ExecutorService getExecutorService() { - return _executorService; + public ExecutorService getQueryWorkerExecutorService() { + return _queryWorkerExecutorService; + } + + public ExecutorService getQueryRunnerExecutorService() { + return _queryRunnerExecutorService; } private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java index c6a2552a60..86605c4a8a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java @@ -52,12 +52,11 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { public QueryServer(int port, QueryRunner queryRunner) { _server = ServerBuilder.forPort(port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build(); _queryRunner = queryRunner; - _executorService = queryRunner.getExecutorService(); - LOGGER.info("Initialized QueryServer on port: {}", port); + _executorService = queryRunner.getQueryRunnerExecutorService(); } public void start() { - LOGGER.info("Starting QueryWorker"); + LOGGER.info("Starting QueryServer"); try { _queryRunner.start(); _server.start(); @@ -67,7 +66,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { } public void shutdown() { - LOGGER.info("Shutting down QueryWorker"); + LOGGER.info("Shutting down QueryServer"); try { _queryRunner.shutDown(); _server.shutdown(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java index b523013ce4..5d8226d681 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java @@ -44,8 +44,10 @@ import org.testng.annotations.Test; public class QueryDispatcherTest extends QueryTestSet { private static final Random RANDOM_REQUEST_ID_GEN = new Random(); private static final int QUERY_SERVER_COUNT = 2; - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( - ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTestExecutorService")); + private static final ExecutorService WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_Worker")); + private static final ExecutorService RUNNER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("QueryDispatcherTest_Runner")); private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>(); @@ -59,7 +61,8 @@ public class QueryDispatcherTest extends QueryTestSet { for (int i = 0; i < QUERY_SERVER_COUNT; i++) { int availablePort = QueryTestUtils.getAvailablePort(); QueryRunner queryRunner = Mockito.mock(QueryRunner.class);; - Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE); + Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE); + Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE); QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer.start(); _queryServerMap.put(availablePort, queryServer); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index de19b218eb..4ba37cb707 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -58,8 +58,10 @@ public class QueryServerTest extends QueryTestSet { private static final int QUERY_SERVER_COUNT = 2; private static final String KEY_OF_SERVER_INSTANCE_HOST = "pinot.query.runner.server.hostname"; private static final String KEY_OF_SERVER_INSTANCE_PORT = "pinot.query.runner.server.port"; - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( - ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryServerTestExecutorService")); + private static final ExecutorService WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryServerTest_Worker")); + private static final ExecutorService RUNNER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("QueryServerTest_Runner")); private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); private final Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>(); @@ -74,7 +76,8 @@ public class QueryServerTest extends QueryTestSet { for (int i = 0; i < QUERY_SERVER_COUNT; i++) { int availablePort = QueryTestUtils.getAvailablePort(); QueryRunner queryRunner = Mockito.mock(QueryRunner.class); - Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE); + Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE); + Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE); QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer.start(); _queryServerMap.put(availablePort, queryServer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org