This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 733cc2fa6b [multistage] Use Different Thread Pool for QueryServer 
(#10349)
733cc2fa6b is described below

commit 733cc2fa6ba7ec4fc022a4a1d951028b2156e43d
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Wed Mar 1 01:18:32 2023 +0530

    [multistage] Use Different Thread Pool for QueryServer (#10349)
---
 .../apache/pinot/query/runtime/QueryRunner.java    | 43 +++++++++++++++++++---
 .../apache/pinot/query/service/QueryServer.java    |  7 ++--
 .../pinot/query/service/QueryDispatcherTest.java   |  9 +++--
 .../pinot/query/service/QueryServerTest.java       |  9 +++--
 4 files changed, 53 insertions(+), 15 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 3846e4d424..4c7380943e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.NamedThreadFactory;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -84,7 +85,32 @@ public class QueryRunner {
   private String _hostname;
   private int _port;
   private VirtualServerAddress _rootServer;
-  private ExecutorService _executorService;
+  // Query worker threads are used for (1) running intermediate stage 
operators (2) running segment level operators
+  /**
+   * Query worker threads are used for:
+   * <ol>
+   *   <li>
+   *     Running intermediate stage operators (v2 engine operators).
+   *   </li>
+   *   <li>
+   *     Running per-segment operators submitted in {@link 
BaseCombineOperator}.
+   *   </li>
+   * </ol>
+   */
+  private ExecutorService _queryWorkerExecutorService;
+  /**
+   * Query runner threads are used for:
+   * <ol>
+   *   <li>
+   *     Merging results in BaseCombineOperator for leaf stages. Results are 
provided by per-segment operators run in
+   *     worker threads
+   *   </li>
+   *   <li>
+   *     Building the OperatorChain and submitting to the scheduler for 
non-leaf stages (intermediate stages).
+   *   </li>
+   * </ol>
+   */
+  private ExecutorService _queryRunnerExecutorService;
   private OpChainSchedulerService _scheduler;
 
   /**
@@ -103,10 +129,13 @@ public class QueryRunner {
     try {
       long releaseMs = 
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
           QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
-      _executorService = Executors.newFixedThreadPool(
+      _queryWorkerExecutorService = Executors.newFixedThreadPool(
           ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
           new NamedThreadFactory("query_worker_on_" + _port + "_port"));
-      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs), _executorService);
+      _queryRunnerExecutorService = Executors.newFixedThreadPool(
+          ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+          new NamedThreadFactory("query_runner_on_" + _port + "_port"));
+      _scheduler = new OpChainSchedulerService(new 
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService);
       _mailboxService = MultiplexingMailboxService.newInstance(_hostname, 
_port, config, _scheduler::onDataAvailable);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), 
instanceDataManager, serverMetrics);
@@ -173,8 +202,12 @@ public class QueryRunner {
     }
   }
 
-  public ExecutorService getExecutorService() {
-    return _executorService;
+  public ExecutorService getQueryWorkerExecutorService() {
+    return _queryWorkerExecutorService;
+  }
+
+  public ExecutorService getQueryRunnerExecutorService() {
+    return _queryRunnerExecutorService;
   }
 
   private static List<ServerPlanRequestContext> 
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index c6a2552a60..86605c4a8a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -52,12 +52,11 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   public QueryServer(int port, QueryRunner queryRunner) {
     _server = 
ServerBuilder.forPort(port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
     _queryRunner = queryRunner;
-    _executorService = queryRunner.getExecutorService();
-    LOGGER.info("Initialized QueryServer on port: {}", port);
+    _executorService = queryRunner.getQueryRunnerExecutorService();
   }
 
   public void start() {
-    LOGGER.info("Starting QueryWorker");
+    LOGGER.info("Starting QueryServer");
     try {
       _queryRunner.start();
       _server.start();
@@ -67,7 +66,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   }
 
   public void shutdown() {
-    LOGGER.info("Shutting down QueryWorker");
+    LOGGER.info("Shutting down QueryServer");
     try {
       _queryRunner.shutDown();
       _server.shutdown();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index b523013ce4..5d8226d681 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -44,8 +44,10 @@ import org.testng.annotations.Test;
 public class QueryDispatcherTest extends QueryTestSet {
   private static final Random RANDOM_REQUEST_ID_GEN = new Random();
   private static final int QUERY_SERVER_COUNT = 2;
-  private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
-      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTestExecutorService"));
+  private static final ExecutorService WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_Worker"));
+  private static final ExecutorService RUNNER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new 
NamedThreadFactory("QueryDispatcherTest_Runner"));
 
   private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
   private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -59,7 +61,8 @@ public class QueryDispatcherTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);;
-      
Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index de19b218eb..4ba37cb707 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -58,8 +58,10 @@ public class QueryServerTest extends QueryTestSet {
   private static final int QUERY_SERVER_COUNT = 2;
   private static final String KEY_OF_SERVER_INSTANCE_HOST = 
"pinot.query.runner.server.hostname";
   private static final String KEY_OF_SERVER_INSTANCE_PORT = 
"pinot.query.runner.server.port";
-  private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
-      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryServerTestExecutorService"));
+  private static final ExecutorService WORKER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("QueryServerTest_Worker"));
+  private static final ExecutorService RUNNER_EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new 
NamedThreadFactory("QueryServerTest_Runner"));
 
   private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
   private final Map<Integer, ServerInstance> _queryServerInstanceMap = new 
HashMap<>();
@@ -74,7 +76,8 @@ public class QueryServerTest extends QueryTestSet {
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
-      
Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+      
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
       QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to