This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch adds-optimization-hotfix
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/adds-optimization-hotfix by 
this push:
     new 6c188cf24f AdaptiveServerSelection: hotfix commits (#9872)
6c188cf24f is described below

commit 6c188cf24f939634341c3acdb788befda772c6b0
Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com>
AuthorDate: Tue Nov 29 16:39:09 2022 -0500

    AdaptiveServerSelection: hotfix commits (#9872)
    
    * AdaptiveServerSelection: update response stats for servers that have not 
responded (#9801)
    
    * Optimize AdaptiveServerSelection for replicaGroup based routing (#9803)
---
 .../AdaptiveServerSelector.java                    | 10 +++
 .../adaptiveserverselector/HybridSelector.java     | 28 ++++++++
 .../adaptiveserverselector/LatencySelector.java    | 28 ++++++++
 .../NumInFlightReqSelector.java                    | 26 +++++++
 .../ReplicaGroupInstanceSelector.java              | 23 +++++-
 .../AdaptiveServerSelectorTest.java                | 84 ++++++++++++++++++++++
 .../pinot/core/transport/AsyncQueryResponse.java   | 26 ++++++-
 .../apache/pinot/core/transport/QueryRouter.java   | 13 +---
 .../pinot/core/transport/QueryRoutingTest.java     | 48 ++++++++++++-
 9 files changed, 269 insertions(+), 17 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
index b114c41f59..0f38ee92b1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
@@ -45,4 +45,14 @@ public interface AdaptiveServerSelector {
    * @return List of servers along with their values ranked from best to worst.
    */
   List<Pair<String, Double>> fetchAllServerRankingsWithScores();
+
+  /**
+   * Same as above but fetches ranking only for the list of serverCandidates 
provided in the parameter. If a server
+   * doesn't have an entry, it's ranked better than other serverCandidates and 
a value of -1.0 is returned. With the
+   * above "fetchAllServerRankingsWithScores" API, ranking for all the servers 
are fetched. This can become
+   * problematic if a broker is routing to multiple  server tenants but a 
query needs to touch only a single server
+   * tenant. This API helps fetch ranking only for a subset of servers.
+   * @return List of servers along with their values ranked from best to worst.
+   */
+  List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> 
serverCandidates);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
index 0aec9a37ad..7b0c8d6248 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.broker.routing.adaptiveserverselector;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 
@@ -85,4 +87,30 @@ public class HybridSelector implements 
AdaptiveServerSelector {
 
     return pairList;
   }
+
+  @Override
+  public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> 
serverCandidates) {
+    List<Pair<String, Double>> pairList = new ArrayList<>();
+    if (serverCandidates.size() == 0) {
+      return pairList;
+    }
+
+    for (String server : serverCandidates) {
+      Double score = 
_serverRoutingStatsManager.fetchHybridScoreForServer(server);
+      if (score == null) {
+        score = -1.0;
+      }
+
+      pairList.add(new ImmutablePair<>(server, score));
+    }
+
+    // Let's shuffle the list before sorting. This helps with randomly 
choosing different servers if there is a tie.
+    Collections.shuffle(pairList);
+    Collections.sort(pairList, (o1, o2) -> {
+      int val = Double.compare(o1.getRight(), o2.getRight());
+      return val;
+    });
+
+    return pairList;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
index 35e553f6dd..1b3a06b2d3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.broker.routing.adaptiveserverselector;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 
@@ -79,4 +81,30 @@ public class LatencySelector implements 
AdaptiveServerSelector {
 
     return pairList;
   }
+
+  @Override
+  public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> 
serverCandidates) {
+    List<Pair<String, Double>> pairList = new ArrayList<>();
+    if (serverCandidates.size() == 0) {
+      return pairList;
+    }
+
+    for (String server : serverCandidates) {
+      Double score = 
_serverRoutingStatsManager.fetchEMALatencyForServer(server);
+      if (score == null) {
+        score = -1.0;
+      }
+
+      pairList.add(new ImmutablePair<>(server, score));
+    }
+
+    // Let's shuffle the list before sorting. This helps with randomly 
choosing different servers if there is a tie.
+    Collections.shuffle(pairList);
+    Collections.sort(pairList, (o1, o2) -> {
+      int val = Double.compare(o1.getRight(), o2.getRight());
+      return val;
+    });
+
+    return pairList;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
index 480fc5912b..052344b771 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
@@ -85,4 +85,30 @@ public class NumInFlightReqSelector implements 
AdaptiveServerSelector {
 
     return pairList;
   }
+
+  @Override
+  public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> 
serverCandidates) {
+    List<Pair<String, Double>> pairList = new ArrayList<>();
+    if (serverCandidates.size() == 0) {
+      return pairList;
+    }
+
+    for (String server : serverCandidates) {
+      Integer score = 
_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(server);
+      if (score == null) {
+        score = -1;
+      }
+
+      pairList.add(new ImmutablePair<>(server, (double) score));
+    }
+
+    // Let's shuffle the list before sorting. This helps with randomly 
choosing different servers if there is a tie.
+    Collections.shuffle(pairList);
+    Collections.sort(pairList, (o1, o2) -> {
+      int val = Double.compare(o1.getRight(), o2.getRight());
+      return val;
+    });
+
+    return pairList;
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index a9ca487ff5..4389fcccbe 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -20,8 +20,10 @@ package org.apache.pinot.broker.routing.instanceselector;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
@@ -68,14 +70,15 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, 
String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
 
-
     if (_adaptiveServerSelector != null) {
       // Adaptive Server Selection is enabled.
       List<String> serverRankList = new ArrayList<>();
+      List<String> candidateServers = fetchCandidateServersForQuery(segments, 
segmentToEnabledInstancesMap);
 
       // Fetch serverRankList before looping through all the segments. This is 
important to make sure that we pick
       // the least amount of instances for a query by referring to a single 
snapshot of the rankings.
-      List<Pair<String, Double>> serverRankListWithScores = 
_adaptiveServerSelector.fetchAllServerRankingsWithScores();
+      List<Pair<String, Double>> serverRankListWithScores =
+          
_adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers);
       for (Pair<String, Double> entry : serverRankListWithScores) {
         serverRankList.add(entry.getLeft());
       }
@@ -156,4 +159,20 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       segmentToSelectedInstanceMap.put(segment, selectedInstance);
     }
   }
+
+  private List<String> fetchCandidateServersForQuery(List<String> segments,
+      Map<String, List<String>> segmentToEnabledInstancesMap) {
+    List<String> serversList = new ArrayList<>();
+
+    Set<String> tempServerSet = new HashSet<>();
+    for (String segment : segments) {
+      List<String> enabledInstances = 
segmentToEnabledInstancesMap.get(segment);
+      if (enabledInstances != null) {
+        tempServerSet.addAll(enabledInstances);
+      }
+    }
+
+    serversList.addAll(tempServerSet);
+    return serversList;
+  }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index cd2c97dffb..33a8d73a05 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.routing.adaptiveserverselector;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -103,6 +104,12 @@ public class AdaptiveServerSelectorTest {
     List<Pair<String, Double>> serverRankingWithVal = 
selector.fetchAllServerRankingsWithScores();
     assertTrue(serverRankingWithVal.isEmpty());
 
+    // -1.0 will be returned for all servers.
+    serverRankingWithVal = selector.fetchServerRankingsWithScores(_servers);
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      assertEquals(entry.getRight(), -1.0);
+    }
+
     // A random server will be returned if any of the candidate servers do not 
have stats.
     String selectedServer = selector.select(_servers);
     assertTrue(_servers.contains(selectedServer), selectedServer);
@@ -132,6 +139,13 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(_servers);
     assertEquals(selectedServer, _servers.get(0));
 
+    List<String> candidateServers = new ArrayList<>(Arrays.asList("server2", 
"server3"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 2);
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      assertEquals(entry.getRight(), (double) 
numInflightReqMap.get(entry.getLeft()));
+    }
+
     // TEST 3 : Populate all servers with unequal stats.
     // Current numInFlightRequests:
     //   server1 -> 10
@@ -169,6 +183,17 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(Arrays.asList("server3", "server1", 
"server2"));
     assertEquals(selectedServer, "server1");
 
+    candidateServers = new ArrayList<>(Arrays.asList("server4", "server3", 
"server1"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 3);
+    assertEquals(serverRankingWithVal.get(0).getLeft(), "server1");
+    assertEquals(serverRankingWithVal.get(0).getRight(), (double) 
numInflightReqMap.get("server1"));
+    assertEquals(serverRankingWithVal.get(1).getLeft(), "server3");
+    assertEquals(serverRankingWithVal.get(1).getRight(), (double) 
numInflightReqMap.get("server3"));
+    assertEquals(serverRankingWithVal.get(2).getLeft(), "server4");
+    assertEquals(serverRankingWithVal.get(2).getRight(), (double) 
numInflightReqMap.get("server4"));
+
+
     // TEST 4: Populate all servers with unequal stats.
     // Current numInFlightRequests:
     //   server1 -> 12
@@ -209,6 +234,14 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(Arrays.asList("server3", "server1", 
"server2"));
     assertEquals(selectedServer, "server2");
 
+    candidateServers = new ArrayList<>(Arrays.asList("server2", "server1"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 2);
+    assertEquals(serverRankingWithVal.get(0).getLeft(), "server2");
+    assertEquals(serverRankingWithVal.get(0).getRight(), (double) 
numInflightReqMap.get("server2"));
+    assertEquals(serverRankingWithVal.get(1).getLeft(), "server1");
+    assertEquals(serverRankingWithVal.get(1).getRight(), (double) 
numInflightReqMap.get("server1"));
+
     // Test 5: Simulate server selection code. Pick the best server using 
NumInFlightReqSelector during every
     // iteration. Every iteration increases the number of inflight requests 
but decides with the flip of a coin
     // to decrement the number of request on the server. Verify if 
NumInFlightReqSelector chooses the best server
@@ -275,6 +308,12 @@ public class AdaptiveServerSelectorTest {
     String selectedServer = selector.select(_servers);
     assertTrue(_servers.contains(selectedServer), selectedServer);
 
+    List<String> candidateServers = new ArrayList<>(Arrays.asList("server2"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 1);
+    assertEquals(serverRankingWithVal.get(0).getLeft(), "server2");
+    assertEquals(serverRankingWithVal.get(0).getRight(), -1.0);
+
     // TEST 2: Populate all servers with equal latencies.
     for (int ii = 0; ii < 10; ii++) {
       for (String server : _servers) {
@@ -296,6 +335,13 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(_servers);
     assertEquals(selectedServer, _servers.get(0));
 
+    candidateServers = new ArrayList<>(Arrays.asList("server4", "server3"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 2);
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      assertEquals(entry.getRight(), 
latencyMap.get(entry.getLeft()).getAverage());
+    }
+
     // TEST 3: Populate servers with unequal latencies.
     // Latencies added to servers are as follows:
     // server1 -> 1
@@ -331,6 +377,18 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(Arrays.asList("server3", "server4", 
"server2"));
     assertEquals(selectedServer, "server2");
 
+    candidateServers = new ArrayList<>(Arrays.asList("server4", "server1", 
"server3"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 3);
+    prevVal = 0.0;
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      String server = entry.getLeft();
+      double latency = entry.getRight();
+      assertEquals(latency, (double) latencyMap.get(server).getAverage());
+      assertTrue(prevVal <= latency, prevVal + " " + latency + " " + server);
+      prevVal = latency;
+    }
+
     // Test 4: Simulate server selection code. Pick the best server using 
LatencySelector during every iteration.
     // Every iteration updates latency for a server. Verify if LatencySelector 
picks the best server in every iteration.
     for (int ii = 0; ii < 1000; ii++) {
@@ -387,6 +445,14 @@ public class AdaptiveServerSelectorTest {
     String selectedServer = selector.select(_servers);
     assertTrue(_servers.contains(selectedServer), selectedServer);
 
+    List<String> candidateServers = new ArrayList<>(Arrays.asList("server2", 
"server3", "server1", "server4"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 4);
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      assertEquals(entry.getRight(), -1.0);
+    }
+
+
     // TEST 2: Populate all servers with equal numInFlightRequests and 
latencies.
     for (int ii = 0; ii < 10; ii++) {
       for (String server : _servers) {
@@ -413,6 +479,13 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(_servers);
     assertEquals(selectedServer, _servers.get(0));
 
+    candidateServers = new ArrayList<>(Arrays.asList("server1", "server2"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 2);
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      assertEquals(entry.getRight(), serverRankingWithVal.get(0).getRight());
+    }
+
     // Test 3: Populate servers with unequal latencies and numInFlightRequests.
     for (int ii = 0; ii < _servers.size(); ii++) {
       String server = _servers.get(ii);
@@ -441,6 +514,17 @@ public class AdaptiveServerSelectorTest {
     selectedServer = selector.select(Arrays.asList("server3", "server4", 
"server2"));
     assertEquals(selectedServer, "server2");
 
+    candidateServers = new ArrayList<>(Arrays.asList("server1", "server2"));
+    serverRankingWithVal = 
selector.fetchServerRankingsWithScores(candidateServers);
+    assertEquals(serverRankingWithVal.size(), 2);
+    prevVal = 0.0;
+    for (Pair<String, Double> entry : serverRankingWithVal) {
+      String server = entry.getLeft();
+      double latency = entry.getRight();
+      assertTrue(prevVal <= latency, prevVal + " " + latency + " " + server);
+      prevVal = latency;
+    }
+
     // Test 4: Simulate server selection code. Pick the best server using 
HybridSelector during every iteration.
     // Every iteration updates latency and numInFlightRequests for a server. 
Verify if HybridSelector picks the best
     // server in every iteration.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
index 6e9bafeb71..ab32316fdb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.datatable.DataTable;
+import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 
 
 /**
@@ -43,12 +44,13 @@ public class AsyncQueryResponse implements QueryResponse {
   private final CountDownLatch _countDownLatch;
   private final long _maxEndTimeMs;
   private final long _timeoutMs;
+  private final ServerRoutingStatsManager _serverRoutingStatsManager;
 
   private volatile ServerRoutingInstance _failedServer;
   private volatile Exception _exception;
 
   public AsyncQueryResponse(QueryRouter queryRouter, long requestId, 
Set<ServerRoutingInstance> serversQueried,
-      long startTimeMs, long timeoutMs) {
+      long startTimeMs, long timeoutMs, ServerRoutingStatsManager 
serverRoutingStatsManager) {
     _queryRouter = queryRouter;
     _requestId = requestId;
     int numServersQueried = serversQueried.size();
@@ -59,6 +61,7 @@ public class AsyncQueryResponse implements QueryResponse {
     _countDownLatch = new CountDownLatch(numServersQueried);
     _timeoutMs = timeoutMs;
     _maxEndTimeMs = startTimeMs + timeoutMs;
+    _serverRoutingStatsManager = serverRoutingStatsManager;
   }
 
   @Override
@@ -84,6 +87,17 @@ public class AsyncQueryResponse implements QueryResponse {
       _status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED : 
Status.TIMED_OUT);
       return _responseMap;
     } finally {
+      // Update ServerRoutingStats.
+      for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : 
_responseMap.entrySet()) {
+        ServerResponse response = entry.getValue();
+        if (response == null || response.getDataTable() == null) {
+          // These are servers from which a response was not received. So 
update query response stats for such
+          // servers with maximum latency i.e timeout value.
+          
_serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, 
entry.getKey().getInstanceId(),
+              _timeoutMs);
+        }
+      }
+
       _queryRouter.markQueryDone(_requestId);
     }
   }
@@ -134,7 +148,15 @@ public class AsyncQueryResponse implements QueryResponse {
 
   void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable 
dataTable, int responseSize,
       int deserializationTimeMs) {
-    _responseMap.get(serverRoutingInstance).receiveDataTable(dataTable, 
responseSize, deserializationTimeMs);
+    ServerResponse response = _responseMap.get(serverRoutingInstance);
+    response.receiveDataTable(dataTable, responseSize, deserializationTimeMs);
+
+    // Record query completion stats immediately after receiving the response 
from the server instead of waiting
+    // for all servers to respond. This helps to keep the stats up-to-date.
+    long latencyMs = response.getResponseDelayMs();
+    _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, 
serverRoutingInstance.getInstanceId(),
+        latencyMs);
+
     _numServersResponded.getAndIncrement();
     _countDownLatch.countDown();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index f2cb638d00..68c58088c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -120,7 +120,8 @@ public class QueryRouter {
 
     // Create the asynchronous query response with the request map
     AsyncQueryResponse asyncQueryResponse =
-        new AsyncQueryResponse(this, requestId, requestMap.keySet(), 
System.currentTimeMillis(), timeoutMs);
+        new AsyncQueryResponse(this, requestId, requestMap.keySet(), 
System.currentTimeMillis(), timeoutMs,
+            _serverRoutingStatsManager);
     _asyncQueryResponseMap.put(requestId, asyncQueryResponse);
     for (Map.Entry<ServerRoutingInstance, InstanceRequest> entry : 
requestMap.entrySet()) {
       ServerRoutingInstance serverRoutingInstance = entry.getKey();
@@ -153,8 +154,6 @@ public class QueryRouter {
       AsyncQueryResponse asyncQueryResponse, Exception e) {
     LOGGER.error("Caught exception while sending request {} to server: {}, 
marking query failed", requestId,
         serverRoutingInstance, e);
-    _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, 
serverRoutingInstance.getInstanceId(),
-        (int) asyncQueryResponse.getTimeoutMs());
     asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
   }
 
@@ -188,20 +187,12 @@ public class QueryRouter {
     // Query future might be null if the query is already done (maybe due to 
failure)
     if (asyncQueryResponse != null) {
       asyncQueryResponse.receiveDataTable(serverRoutingInstance, dataTable, 
responseSize, deserializationTimeMs);
-
-      // Record query completion stats immediately after receiving the 
response from the server instead of waiting
-      // for the reduce phase.
-      long latencyMs = 
asyncQueryResponse.getServerResponseDelayMs(serverRoutingInstance);
-      _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, 
serverRoutingInstance.getInstanceId(),
-          latencyMs);
     }
   }
 
   void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception 
exception) {
     for (AsyncQueryResponse asyncQueryResponse : 
_asyncQueryResponseMap.values()) {
       asyncQueryResponse.markServerDown(serverRoutingInstance, exception);
-      
_serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(),
-          serverRoutingInstance.getInstanceId(), (int) 
asyncQueryResponse.getTimeoutMs());
     }
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 9fdaf7fb7d..5e1fa04176 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.transport;
 
 import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datatable.DataTable;
@@ -33,7 +34,9 @@ import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -60,10 +63,18 @@ public class QueryRoutingTest {
       Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
 
   private QueryRouter _queryRouter;
+  private ServerRoutingStatsManager _serverRoutingStatsManager;
+  int _requestCount;
 
   @BeforeClass
   public void setUp() {
-    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), 
mock(ServerRoutingStatsManager.class));
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
 true);
+    PinotConfiguration cfg = new PinotConfiguration(properties);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    _serverRoutingStatsManager.init();
+    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), 
_serverRoutingStatsManager);
+    _requestCount = 0;
   }
 
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes) {
@@ -88,6 +99,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -95,13 +107,17 @@ public class QueryRoutingTest {
 
     // OFFLINE only
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 1_000L);
+        _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, 
ROUTING_TABLE, null, null, 600_000L);
     Map<ServerRoutingInstance, ServerResponse> response = 
asyncQueryResponse.getFinalResponses();
     assertEquals(response.size(), 1);
     assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = 
response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    // 2 requests - query submit and query response.
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // REALTIME only
     asyncQueryResponse =
@@ -112,6 +128,9 @@ public class QueryRoutingTest {
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Hybrid
     asyncQueryResponse =
@@ -127,6 +146,9 @@ public class QueryRoutingTest {
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
     assertNotNull(serverResponse.getDataTable());
     assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    _requestCount += 4;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -136,6 +158,7 @@ public class QueryRoutingTest {
   public void testInvalidResponse()
       throws Exception {
     long requestId = 123;
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, new byte[0]);
@@ -154,6 +177,9 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
     assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -166,6 +192,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(0, responseBytes);
@@ -184,6 +211,9 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
     assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
 
     // Shut down the server
     queryServer.shutDown();
@@ -199,6 +229,7 @@ public class QueryRoutingTest {
     DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
+    String serverId = SERVER_INSTANCE.getInstanceId();
 
     // Start the server
     QueryServer queryServer = getQueryServer(500, responseBytes);
@@ -221,6 +252,10 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
     assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
+
 
     // Submit query after server is down
     startTimeMs = System.currentTimeMillis();
@@ -237,6 +272,15 @@ public class QueryRoutingTest {
     assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
     assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs);
+    _requestCount += 2;
+    waitForStatsUpdate(_requestCount);
+    
assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(),
 0);
+  }
+
+  private void waitForStatsUpdate(long taskCount) {
+    TestUtils.waitForCondition(aVoid -> {
+      return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount);
+    }, 5L, 5000, "Failed to record stats for AdaptiveServerSelectorTest");
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to