This is an automated email from the ASF dual-hosted git repository. vvivekiyer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 61aa6ce468 Reposition query submission spot for adaptive server selection (#13327) 61aa6ce468 is described below commit 61aa6ce468888c1c472a3818c8116cabe7566656 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Sat Jun 8 08:55:28 2024 -0700 Reposition query submission spot for adaptive server selection (#13327) * Refactor ADSS querysubmission stats to avoid missing servers * Address review comments. --- .../AdaptiveServerSelectorTest.java | 20 ++++++++++---------- .../pinot/core/transport/AsyncQueryResponse.java | 6 +++++- .../org/apache/pinot/core/transport/QueryRouter.java | 4 ---- .../routing/stats/ServerRoutingStatsManager.java | 4 ++-- .../routing/stats/ServerRoutingStatsManagerTest.java | 6 +++--- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java index 926de0699d..427065b76e 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java @@ -153,7 +153,7 @@ public class AdaptiveServerSelectorTest { } for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -187,7 +187,7 @@ public class AdaptiveServerSelectorTest { for (int ii = 0; ii < _servers.size(); ii++) { for (int jj = 0; jj < ii; jj++) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(ii)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(ii)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -232,15 +232,15 @@ public class AdaptiveServerSelectorTest { numInflightReqMap.put("server2", 11); numInflightReqMap.put("server3", 15); numInflightReqMap.put("server4", 13); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(0)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(0)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, _servers.get(2)); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, _servers.get(2)); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); serverRankingWithVal = selector.fetchAllServerRankingsWithScores(); @@ -290,7 +290,7 @@ public class AdaptiveServerSelectorTest { // Route the request to the best server. selectedServer = serverRankingWithVal.get(0).getLeft(); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); int numReq = numInflightReqMap.get(selectedServer) + 1; numInflightReqMap.put(selectedServer, numReq); @@ -484,7 +484,7 @@ public class AdaptiveServerSelectorTest { // TEST 2: Populate all servers with equal numInFlightRequests and latencies. for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, server); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, server); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); } } @@ -571,7 +571,7 @@ public class AdaptiveServerSelectorTest { // Route the request to the best server. selectedServer = serverRankingWithVal.get(0).getLeft(); - serverRoutingStatsManager.recordStatsAfterQuerySubmission(-1, selectedServer); + serverRoutingStatsManager.recordStatsForQuerySubmission(-1, selectedServer); waitForStatsUpdate(serverRoutingStatsManager, ++taskCount); if (rand.nextBoolean()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index f03509fb54..7bcc90b50d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -56,13 +56,17 @@ public class AsyncQueryResponse implements QueryResponse { _requestId = requestId; int numServersQueried = serversQueried.size(); _responseMap = new ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried)); + _serverRoutingStatsManager = serverRoutingStatsManager; for (ServerRoutingInstance serverRoutingInstance : serversQueried) { + // Record stats related to query submission just before sending the request. Otherwise, if the response is + // received immediately, there's a possibility of updating query response stats before updating query + // submission stats. + _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs)); } _countDownLatch = new CountDownLatch(numServersQueried); _timeoutMs = timeoutMs; _maxEndTimeMs = startTimeMs + timeoutMs; - _serverRoutingStatsManager = serverRoutingStatsManager; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 086211ad5f..e5b96840e7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -126,10 +126,6 @@ public class QueryRouter { ServerRoutingInstance serverRoutingInstance = entry.getKey(); ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; try { - // Record stats related to query submission just before sending the request. Otherwise, if the response is - // received immediately, there's a possibility of updating query response stats before updating query - // submission stats. - _serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); serverChannels.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, entry.getValue(), timeoutMs); asyncQueryResponse.markRequestSubmitted(serverRoutingInstance); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java index 1057fc084f..a21906d23b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java @@ -135,9 +135,9 @@ public class ServerRoutingStatsManager { } /** - * Called when a query is submitted to a server. Updates stats corresponding to query submission. + * Called just before submitting a query to a server. Updates stats corresponding to query submission. */ - public void recordStatsAfterQuerySubmission(long requestId, String serverInstanceId) { + public void recordStatsForQuerySubmission(long requestId, String serverInstanceId) { if (!_isEnabled) { return; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java index 84983f87ec..1ef3022a09 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java @@ -131,7 +131,7 @@ public class ServerRoutingStatsManagerTest { int requestId = 0; // Submit stats for server1. - manager.recordStatsAfterQuerySubmission(requestId++, "server1"); + manager.recordStatsForQuerySubmission(requestId++, "server1"); waitForStatsUpdate(manager, requestId); List<Pair<String, Integer>> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers(); @@ -156,7 +156,7 @@ public class ServerRoutingStatsManagerTest { assertEquals(score, 0.0); // Submit more stats for server 1. - manager.recordStatsAfterQuerySubmission(requestId++, "server1"); + manager.recordStatsForQuerySubmission(requestId++, "server1"); waitForStatsUpdate(manager, requestId); numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers(); @@ -181,7 +181,7 @@ public class ServerRoutingStatsManagerTest { assertEquals(score, 0.0); // Add a new server server2. - manager.recordStatsAfterQuerySubmission(requestId++, "server2"); + manager.recordStatsForQuerySubmission(requestId++, "server2"); waitForStatsUpdate(manager, requestId); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org