This is an automated email from the ASF dual-hosted git repository.

vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 61aa6ce468 Reposition query submission spot for adaptive server 
selection (#13327)
61aa6ce468 is described below

commit 61aa6ce468888c1c472a3818c8116cabe7566656
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Sat Jun 8 08:55:28 2024 -0700

    Reposition query submission spot for adaptive server selection (#13327)
    
    * Refactor ADSS querysubmission stats to avoid missing servers
    
    * Address review comments.
---
 .../AdaptiveServerSelectorTest.java                  | 20 ++++++++++----------
 .../pinot/core/transport/AsyncQueryResponse.java     |  6 +++++-
 .../org/apache/pinot/core/transport/QueryRouter.java |  4 ----
 .../routing/stats/ServerRoutingStatsManager.java     |  4 ++--
 .../routing/stats/ServerRoutingStatsManagerTest.java |  6 +++---
 5 files changed, 20 insertions(+), 20 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index 926de0699d..427065b76e 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -153,7 +153,7 @@ public class AdaptiveServerSelectorTest {
     }
     for (int ii = 0; ii < 10; ii++) {
       for (String server : _servers) {
-        serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
+        serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
         waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
       }
     }
@@ -187,7 +187,7 @@ public class AdaptiveServerSelectorTest {
 
     for (int ii = 0; ii < _servers.size(); ii++) {
       for (int jj = 0; jj < ii; jj++) {
-        serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(ii));
+        serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(ii));
         waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
       }
     }
@@ -232,15 +232,15 @@ public class AdaptiveServerSelectorTest {
     numInflightReqMap.put("server2", 11);
     numInflightReqMap.put("server3", 15);
     numInflightReqMap.put("server4", 13);
-    serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(0));
+    serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(0));
     waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
-    serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(0));
+    serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(0));
     waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
-    serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(2));
+    serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(2));
     waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
-    serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(2));
+    serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(2));
     waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
-    serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
_servers.get(2));
+    serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
_servers.get(2));
     waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
 
     serverRankingWithVal = selector.fetchAllServerRankingsWithScores();
@@ -290,7 +290,7 @@ public class AdaptiveServerSelectorTest {
 
       // Route the request to the best server.
       selectedServer = serverRankingWithVal.get(0).getLeft();
-      serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
selectedServer);
+      serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
selectedServer);
       waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
       int numReq = numInflightReqMap.get(selectedServer) + 1;
       numInflightReqMap.put(selectedServer, numReq);
@@ -484,7 +484,7 @@ public class AdaptiveServerSelectorTest {
     // TEST 2: Populate all servers with equal numInFlightRequests and 
latencies.
     for (int ii = 0; ii < 10; ii++) {
       for (String server : _servers) {
-        serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server);
+        serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server);
         waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
       }
     }
@@ -571,7 +571,7 @@ public class AdaptiveServerSelectorTest {
 
       // Route the request to the best server.
       selectedServer = serverRankingWithVal.get(0).getLeft();
-      serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, 
selectedServer);
+      serverRoutingStatsManager.recordStatsForQuerySubmission(-1, 
selectedServer);
       waitForStatsUpdate(serverRoutingStatsManager, ++taskCount);
 
       if (rand.nextBoolean()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index f03509fb54..7bcc90b50d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -56,13 +56,17 @@ public class AsyncQueryResponse implements QueryResponse {
     _requestId = requestId;
     int numServersQueried = serversQueried.size();
     _responseMap = new 
ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
+    _serverRoutingStatsManager = serverRoutingStatsManager;
     for (ServerRoutingInstance serverRoutingInstance : serversQueried) {
+      // Record stats related to query submission just before sending the 
request. Otherwise, if the response is
+      // received immediately, there's a possibility of updating query 
response stats before updating query
+      // submission stats.
+      _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, 
serverRoutingInstance.getInstanceId());
       _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
     }
     _countDownLatch = new CountDownLatch(numServersQueried);
     _timeoutMs = timeoutMs;
     _maxEndTimeMs = startTimeMs + timeoutMs;
-    _serverRoutingStatsManager = serverRoutingStatsManager;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index 086211ad5f..e5b96840e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -126,10 +126,6 @@ public class QueryRouter {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
       ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? 
_serverChannelsTls : _serverChannels;
       try {
-        // Record stats related to query submission just before sending the 
request. Otherwise, if the response is
-        // received immediately, there's a possibility of updating query 
response stats before updating query
-        // submission stats.
-        _serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, 
serverRoutingInstance.getInstanceId());
         serverChannels.sendRequest(rawTableName, asyncQueryResponse, 
serverRoutingInstance, entry.getValue(),
             timeoutMs);
         asyncQueryResponse.markRequestSubmitted(serverRoutingInstance);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 1057fc084f..a21906d23b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -135,9 +135,9 @@ public class ServerRoutingStatsManager {
   }
 
   /**
-   * Called when a query is submitted to a server. Updates stats corresponding 
to query submission.
+   * Called just before submitting a query to a server. Updates stats 
corresponding to query submission.
    */
-  public void recordStatsAfterQuerySubmission(long requestId, String 
serverInstanceId) {
+  public void recordStatsForQuerySubmission(long requestId, String 
serverInstanceId) {
     if (!_isEnabled) {
       return;
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 84983f87ec..1ef3022a09 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -131,7 +131,7 @@ public class ServerRoutingStatsManagerTest {
     int requestId = 0;
 
     // Submit stats for server1.
-    manager.recordStatsAfterQuerySubmission(requestId++, "server1");
+    manager.recordStatsForQuerySubmission(requestId++, "server1");
     waitForStatsUpdate(manager, requestId);
 
     List<Pair<String, Integer>> numInFlightReqList = 
manager.fetchNumInFlightRequestsForAllServers();
@@ -156,7 +156,7 @@ public class ServerRoutingStatsManagerTest {
     assertEquals(score, 0.0);
 
     // Submit more stats for server 1.
-    manager.recordStatsAfterQuerySubmission(requestId++, "server1");
+    manager.recordStatsForQuerySubmission(requestId++, "server1");
     waitForStatsUpdate(manager, requestId);
 
     numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
@@ -181,7 +181,7 @@ public class ServerRoutingStatsManagerTest {
     assertEquals(score, 0.0);
 
     // Add a new server server2.
-    manager.recordStatsAfterQuerySubmission(requestId++, "server2");
+    manager.recordStatsForQuerySubmission(requestId++, "server2");
     waitForStatsUpdate(manager, requestId);
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to