This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch adds-optimization-hotfix in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/adds-optimization-hotfix by this push: new 6c188cf24f AdaptiveServerSelection: hotfix commits (#9872) 6c188cf24f is described below commit 6c188cf24f939634341c3acdb788befda772c6b0 Author: Vivek Iyer Vaidyanathan <vviveki...@gmail.com> AuthorDate: Tue Nov 29 16:39:09 2022 -0500 AdaptiveServerSelection: hotfix commits (#9872) * AdaptiveServerSelection: update response stats for servers that have not responded (#9801) * Optimize AdaptiveServerSelection for replicaGroup based routing (#9803) --- .../AdaptiveServerSelector.java | 10 +++ .../adaptiveserverselector/HybridSelector.java | 28 ++++++++ .../adaptiveserverselector/LatencySelector.java | 28 ++++++++ .../NumInFlightReqSelector.java | 26 +++++++ .../ReplicaGroupInstanceSelector.java | 23 +++++- .../AdaptiveServerSelectorTest.java | 84 ++++++++++++++++++++++ .../pinot/core/transport/AsyncQueryResponse.java | 26 ++++++- .../apache/pinot/core/transport/QueryRouter.java | 13 +--- .../pinot/core/transport/QueryRoutingTest.java | 48 ++++++++++++- 9 files changed, 269 insertions(+), 17 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java index b114c41f59..0f38ee92b1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java @@ -45,4 +45,14 @@ public interface AdaptiveServerSelector { * @return List of servers along with their values ranked from best to worst. */ List<Pair<String, Double>> fetchAllServerRankingsWithScores(); + + /** + * Same as above but fetches ranking only for the list of serverCandidates provided in the parameter. If a server + * doesn't have an entry, it's ranked better than other serverCandidates and a value of -1.0 is returned. With the + * above "fetchAllServerRankingsWithScores" API, ranking for all the servers are fetched. This can become + * problematic if a broker is routing to multiple server tenants but a query needs to touch only a single server + * tenant. This API helps fetch ranking only for a subset of servers. + * @return List of servers along with their values ranked from best to worst. + */ + List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> serverCandidates); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java index 0aec9a37ad..7b0c8d6248 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.broker.routing.adaptiveserverselector; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -85,4 +87,30 @@ public class HybridSelector implements AdaptiveServerSelector { return pairList; } + + @Override + public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> serverCandidates) { + List<Pair<String, Double>> pairList = new ArrayList<>(); + if (serverCandidates.size() == 0) { + return pairList; + } + + for (String server : serverCandidates) { + Double score = _serverRoutingStatsManager.fetchHybridScoreForServer(server); + if (score == null) { + score = -1.0; + } + + pairList.add(new ImmutablePair<>(server, score)); + } + + // Let's shuffle the list before sorting. This helps with randomly choosing different servers if there is a tie. + Collections.shuffle(pairList); + Collections.sort(pairList, (o1, o2) -> { + int val = Double.compare(o1.getRight(), o2.getRight()); + return val; + }); + + return pairList; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java index 35e553f6dd..1b3a06b2d3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java @@ -18,9 +18,11 @@ */ package org.apache.pinot.broker.routing.adaptiveserverselector; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -79,4 +81,30 @@ public class LatencySelector implements AdaptiveServerSelector { return pairList; } + + @Override + public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> serverCandidates) { + List<Pair<String, Double>> pairList = new ArrayList<>(); + if (serverCandidates.size() == 0) { + return pairList; + } + + for (String server : serverCandidates) { + Double score = _serverRoutingStatsManager.fetchEMALatencyForServer(server); + if (score == null) { + score = -1.0; + } + + pairList.add(new ImmutablePair<>(server, score)); + } + + // Let's shuffle the list before sorting. This helps with randomly choosing different servers if there is a tie. + Collections.shuffle(pairList); + Collections.sort(pairList, (o1, o2) -> { + int val = Double.compare(o1.getRight(), o2.getRight()); + return val; + }); + + return pairList; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java index 480fc5912b..052344b771 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java @@ -85,4 +85,30 @@ public class NumInFlightReqSelector implements AdaptiveServerSelector { return pairList; } + + @Override + public List<Pair<String, Double>> fetchServerRankingsWithScores(List<String> serverCandidates) { + List<Pair<String, Double>> pairList = new ArrayList<>(); + if (serverCandidates.size() == 0) { + return pairList; + } + + for (String server : serverCandidates) { + Integer score = _serverRoutingStatsManager.fetchNumInFlightRequestsForServer(server); + if (score == null) { + score = -1; + } + + pairList.add(new ImmutablePair<>(server, (double) score)); + } + + // Let's shuffle the list before sorting. This helps with randomly choosing different servers if there is a tie. + Collections.shuffle(pairList); + Collections.sort(pairList, (o1, o2) -> { + int val = Double.compare(o1.getRight(), o2.getRight()); + return val; + }); + + return pairList; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java index a9ca487ff5..4389fcccbe 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java @@ -20,8 +20,10 @@ package org.apache.pinot.broker.routing.instanceselector; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector; @@ -68,14 +70,15 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); - if (_adaptiveServerSelector != null) { // Adaptive Server Selection is enabled. List<String> serverRankList = new ArrayList<>(); + List<String> candidateServers = fetchCandidateServersForQuery(segments, segmentToEnabledInstancesMap); // Fetch serverRankList before looping through all the segments. This is important to make sure that we pick // the least amount of instances for a query by referring to a single snapshot of the rankings. - List<Pair<String, Double>> serverRankListWithScores = _adaptiveServerSelector.fetchAllServerRankingsWithScores(); + List<Pair<String, Double>> serverRankListWithScores = + _adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers); for (Pair<String, Double> entry : serverRankListWithScores) { serverRankList.add(entry.getLeft()); } @@ -156,4 +159,20 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { segmentToSelectedInstanceMap.put(segment, selectedInstance); } } + + private List<String> fetchCandidateServersForQuery(List<String> segments, + Map<String, List<String>> segmentToEnabledInstancesMap) { + List<String> serversList = new ArrayList<>(); + + Set<String> tempServerSet = new HashSet<>(); + for (String segment : segments) { + List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment); + if (enabledInstances != null) { + tempServerSet.addAll(enabledInstances); + } + } + + serversList.addAll(tempServerSet); + return serversList; + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java index cd2c97dffb..33a8d73a05 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.broker.routing.adaptiveserverselector; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -103,6 +104,12 @@ public class AdaptiveServerSelectorTest { List<Pair<String, Double>> serverRankingWithVal = selector.fetchAllServerRankingsWithScores(); assertTrue(serverRankingWithVal.isEmpty()); + // -1.0 will be returned for all servers. + serverRankingWithVal = selector.fetchServerRankingsWithScores(_servers); + for (Pair<String, Double> entry : serverRankingWithVal) { + assertEquals(entry.getRight(), -1.0); + } + // A random server will be returned if any of the candidate servers do not have stats. String selectedServer = selector.select(_servers); assertTrue(_servers.contains(selectedServer), selectedServer); @@ -132,6 +139,13 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(_servers); assertEquals(selectedServer, _servers.get(0)); + List<String> candidateServers = new ArrayList<>(Arrays.asList("server2", "server3")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 2); + for (Pair<String, Double> entry : serverRankingWithVal) { + assertEquals(entry.getRight(), (double) numInflightReqMap.get(entry.getLeft())); + } + // TEST 3 : Populate all servers with unequal stats. // Current numInFlightRequests: // server1 -> 10 @@ -169,6 +183,17 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(Arrays.asList("server3", "server1", "server2")); assertEquals(selectedServer, "server1"); + candidateServers = new ArrayList<>(Arrays.asList("server4", "server3", "server1")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 3); + assertEquals(serverRankingWithVal.get(0).getLeft(), "server1"); + assertEquals(serverRankingWithVal.get(0).getRight(), (double) numInflightReqMap.get("server1")); + assertEquals(serverRankingWithVal.get(1).getLeft(), "server3"); + assertEquals(serverRankingWithVal.get(1).getRight(), (double) numInflightReqMap.get("server3")); + assertEquals(serverRankingWithVal.get(2).getLeft(), "server4"); + assertEquals(serverRankingWithVal.get(2).getRight(), (double) numInflightReqMap.get("server4")); + + // TEST 4: Populate all servers with unequal stats. // Current numInFlightRequests: // server1 -> 12 @@ -209,6 +234,14 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(Arrays.asList("server3", "server1", "server2")); assertEquals(selectedServer, "server2"); + candidateServers = new ArrayList<>(Arrays.asList("server2", "server1")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 2); + assertEquals(serverRankingWithVal.get(0).getLeft(), "server2"); + assertEquals(serverRankingWithVal.get(0).getRight(), (double) numInflightReqMap.get("server2")); + assertEquals(serverRankingWithVal.get(1).getLeft(), "server1"); + assertEquals(serverRankingWithVal.get(1).getRight(), (double) numInflightReqMap.get("server1")); + // Test 5: Simulate server selection code. Pick the best server using NumInFlightReqSelector during every // iteration. Every iteration increases the number of inflight requests but decides with the flip of a coin // to decrement the number of request on the server. Verify if NumInFlightReqSelector chooses the best server @@ -275,6 +308,12 @@ public class AdaptiveServerSelectorTest { String selectedServer = selector.select(_servers); assertTrue(_servers.contains(selectedServer), selectedServer); + List<String> candidateServers = new ArrayList<>(Arrays.asList("server2")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 1); + assertEquals(serverRankingWithVal.get(0).getLeft(), "server2"); + assertEquals(serverRankingWithVal.get(0).getRight(), -1.0); + // TEST 2: Populate all servers with equal latencies. for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { @@ -296,6 +335,13 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(_servers); assertEquals(selectedServer, _servers.get(0)); + candidateServers = new ArrayList<>(Arrays.asList("server4", "server3")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 2); + for (Pair<String, Double> entry : serverRankingWithVal) { + assertEquals(entry.getRight(), latencyMap.get(entry.getLeft()).getAverage()); + } + // TEST 3: Populate servers with unequal latencies. // Latencies added to servers are as follows: // server1 -> 1 @@ -331,6 +377,18 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(Arrays.asList("server3", "server4", "server2")); assertEquals(selectedServer, "server2"); + candidateServers = new ArrayList<>(Arrays.asList("server4", "server1", "server3")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 3); + prevVal = 0.0; + for (Pair<String, Double> entry : serverRankingWithVal) { + String server = entry.getLeft(); + double latency = entry.getRight(); + assertEquals(latency, (double) latencyMap.get(server).getAverage()); + assertTrue(prevVal <= latency, prevVal + " " + latency + " " + server); + prevVal = latency; + } + // Test 4: Simulate server selection code. Pick the best server using LatencySelector during every iteration. // Every iteration updates latency for a server. Verify if LatencySelector picks the best server in every iteration. for (int ii = 0; ii < 1000; ii++) { @@ -387,6 +445,14 @@ public class AdaptiveServerSelectorTest { String selectedServer = selector.select(_servers); assertTrue(_servers.contains(selectedServer), selectedServer); + List<String> candidateServers = new ArrayList<>(Arrays.asList("server2", "server3", "server1", "server4")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 4); + for (Pair<String, Double> entry : serverRankingWithVal) { + assertEquals(entry.getRight(), -1.0); + } + + // TEST 2: Populate all servers with equal numInFlightRequests and latencies. for (int ii = 0; ii < 10; ii++) { for (String server : _servers) { @@ -413,6 +479,13 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(_servers); assertEquals(selectedServer, _servers.get(0)); + candidateServers = new ArrayList<>(Arrays.asList("server1", "server2")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 2); + for (Pair<String, Double> entry : serverRankingWithVal) { + assertEquals(entry.getRight(), serverRankingWithVal.get(0).getRight()); + } + // Test 3: Populate servers with unequal latencies and numInFlightRequests. for (int ii = 0; ii < _servers.size(); ii++) { String server = _servers.get(ii); @@ -441,6 +514,17 @@ public class AdaptiveServerSelectorTest { selectedServer = selector.select(Arrays.asList("server3", "server4", "server2")); assertEquals(selectedServer, "server2"); + candidateServers = new ArrayList<>(Arrays.asList("server1", "server2")); + serverRankingWithVal = selector.fetchServerRankingsWithScores(candidateServers); + assertEquals(serverRankingWithVal.size(), 2); + prevVal = 0.0; + for (Pair<String, Double> entry : serverRankingWithVal) { + String server = entry.getLeft(); + double latency = entry.getRight(); + assertTrue(prevVal <= latency, prevVal + " " + latency + " " + server); + prevVal = latency; + } + // Test 4: Simulate server selection code. Pick the best server using HybridSelector during every iteration. // Every iteration updates latency and numInFlightRequests for a server. Verify if HybridSelector picks the best // server in every iteration. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index 6e9bafeb71..ab32316fdb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; /** @@ -43,12 +44,13 @@ public class AsyncQueryResponse implements QueryResponse { private final CountDownLatch _countDownLatch; private final long _maxEndTimeMs; private final long _timeoutMs; + private final ServerRoutingStatsManager _serverRoutingStatsManager; private volatile ServerRoutingInstance _failedServer; private volatile Exception _exception; public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set<ServerRoutingInstance> serversQueried, - long startTimeMs, long timeoutMs) { + long startTimeMs, long timeoutMs, ServerRoutingStatsManager serverRoutingStatsManager) { _queryRouter = queryRouter; _requestId = requestId; int numServersQueried = serversQueried.size(); @@ -59,6 +61,7 @@ public class AsyncQueryResponse implements QueryResponse { _countDownLatch = new CountDownLatch(numServersQueried); _timeoutMs = timeoutMs; _maxEndTimeMs = startTimeMs + timeoutMs; + _serverRoutingStatsManager = serverRoutingStatsManager; } @Override @@ -84,6 +87,17 @@ public class AsyncQueryResponse implements QueryResponse { _status.compareAndSet(Status.IN_PROGRESS, finish ? Status.COMPLETED : Status.TIMED_OUT); return _responseMap; } finally { + // Update ServerRoutingStats. + for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : _responseMap.entrySet()) { + ServerResponse response = entry.getValue(); + if (response == null || response.getDataTable() == null) { + // These are servers from which a response was not received. So update query response stats for such + // servers with maximum latency i.e timeout value. + _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), + _timeoutMs); + } + } + _queryRouter.markQueryDone(_requestId); } } @@ -134,7 +148,15 @@ public class AsyncQueryResponse implements QueryResponse { void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { - _responseMap.get(serverRoutingInstance).receiveDataTable(dataTable, responseSize, deserializationTimeMs); + ServerResponse response = _responseMap.get(serverRoutingInstance); + response.receiveDataTable(dataTable, responseSize, deserializationTimeMs); + + // Record query completion stats immediately after receiving the response from the server instead of waiting + // for all servers to respond. This helps to keep the stats up-to-date. + long latencyMs = response.getResponseDelayMs(); + _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, serverRoutingInstance.getInstanceId(), + latencyMs); + _numServersResponded.getAndIncrement(); _countDownLatch.countDown(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index f2cb638d00..68c58088c3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -120,7 +120,8 @@ public class QueryRouter { // Create the asynchronous query response with the request map AsyncQueryResponse asyncQueryResponse = - new AsyncQueryResponse(this, requestId, requestMap.keySet(), System.currentTimeMillis(), timeoutMs); + new AsyncQueryResponse(this, requestId, requestMap.keySet(), System.currentTimeMillis(), timeoutMs, + _serverRoutingStatsManager); _asyncQueryResponseMap.put(requestId, asyncQueryResponse); for (Map.Entry<ServerRoutingInstance, InstanceRequest> entry : requestMap.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey(); @@ -153,8 +154,6 @@ public class QueryRouter { AsyncQueryResponse asyncQueryResponse, Exception e) { LOGGER.error("Caught exception while sending request {} to server: {}, marking query failed", requestId, serverRoutingInstance, e); - _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(), - (int) asyncQueryResponse.getTimeoutMs()); asyncQueryResponse.markQueryFailed(serverRoutingInstance, e); } @@ -188,20 +187,12 @@ public class QueryRouter { // Query future might be null if the query is already done (maybe due to failure) if (asyncQueryResponse != null) { asyncQueryResponse.receiveDataTable(serverRoutingInstance, dataTable, responseSize, deserializationTimeMs); - - // Record query completion stats immediately after receiving the response from the server instead of waiting - // for the reduce phase. - long latencyMs = asyncQueryResponse.getServerResponseDelayMs(serverRoutingInstance); - _serverRoutingStatsManager.recordStatsUponResponseArrival(requestId, serverRoutingInstance.getInstanceId(), - latencyMs); } } void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception exception) { for (AsyncQueryResponse asyncQueryResponse : _asyncQueryResponseMap.values()) { asyncQueryResponse.markServerDown(serverRoutingInstance, exception); - _serverRoutingStatsManager.recordStatsUponResponseArrival(asyncQueryResponse.getRequestId(), - serverRoutingInstance.getInstanceId(), (int) asyncQueryResponse.getTimeoutMs()); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 9fdaf7fb7d..5e1fa04176 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.transport; import com.google.common.util.concurrent.Futures; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.common.datatable.DataTable; @@ -33,7 +34,9 @@ import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -60,10 +63,18 @@ public class QueryRoutingTest { Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList()); private QueryRouter _queryRouter; + private ServerRoutingStatsManager _serverRoutingStatsManager; + int _requestCount; @BeforeClass public void setUp() { - _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), mock(ServerRoutingStatsManager.class)); + Map<String, Object> properties = new HashMap<>(); + properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true); + PinotConfiguration cfg = new PinotConfiguration(properties); + _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg); + _serverRoutingStatsManager.init(); + _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), _serverRoutingStatsManager); + _requestCount = 0; } private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) { @@ -88,6 +99,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, responseBytes); @@ -95,13 +107,17 @@ public class QueryRoutingTest { // OFFLINE only AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 600_000L); Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses(); assertEquals(response.size(), 1); assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + // 2 requests - query submit and query response. + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // REALTIME only asyncQueryResponse = @@ -112,6 +128,9 @@ public class QueryRoutingTest { serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Hybrid asyncQueryResponse = @@ -127,6 +146,9 @@ public class QueryRoutingTest { serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE); assertNotNull(serverResponse.getDataTable()); assertEquals(serverResponse.getResponseSize(), responseBytes.length); + _requestCount += 4; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -136,6 +158,7 @@ public class QueryRoutingTest { public void testInvalidResponse() throws Exception { long requestId = 123; + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, new byte[0]); @@ -154,6 +177,9 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should time out assertTrue(System.currentTimeMillis() - startTimeMs >= 1000); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -166,6 +192,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(0, responseBytes); @@ -184,6 +211,9 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should time out assertTrue(System.currentTimeMillis() - startTimeMs >= 1000); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); // Shut down the server queryServer.shutDown(); @@ -199,6 +229,7 @@ public class QueryRoutingTest { DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); // Start the server QueryServer queryServer = getQueryServer(500, responseBytes); @@ -221,6 +252,10 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should early terminate assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); + // Submit query after server is down startTimeMs = System.currentTimeMillis(); @@ -237,6 +272,15 @@ public class QueryRoutingTest { assertEquals(serverResponse.getDeserializationTimeMs(), 0); // Query should early terminate assertTrue(System.currentTimeMillis() - startTimeMs < timeoutMs); + _requestCount += 2; + waitForStatsUpdate(_requestCount); + assertEquals(_serverRoutingStatsManager.fetchNumInFlightRequestsForServer(serverId).intValue(), 0); + } + + private void waitForStatsUpdate(long taskCount) { + TestUtils.waitForCondition(aVoid -> { + return (_serverRoutingStatsManager.getCompletedTaskCount() == taskCount); + }, 5L, 5000, "Failed to record stats for AdaptiveServerSelectorTest"); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org