This is an automated email from the ASF dual-hosted git repository. jiaguo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 604b244b64 Optimize Adaptive Server Selection (#13952) 604b244b64 is described below commit 604b244b645dd033b2a1737e70dbca620aad42b0 Author: praveenc7 <praveenkchagan...@gmail.com> AuthorDate: Wed Sep 11 17:24:40 2024 -0700 Optimize Adaptive Server Selection (#13952) * Improve time complexity for adaptiveServer * Remove additional comments * Remove additional comments * review comments * move hashset * move hashset * remove hashset * functional unit test * add test * lint * fix round-robin condition * loop fix --- .../ReplicaGroupInstanceSelector.java | 46 ++++++++--------- .../instanceselector/InstanceSelectorTest.java | 60 ++++++++++++++++++++++ 2 files changed, 81 insertions(+), 25 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java index c766791f2d..374c5235f3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java @@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing.instanceselector; import java.time.Clock; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,17 +76,18 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { SegmentStates segmentStates, Map<String, String> queryOptions) { if (_adaptiveServerSelector != null) { // Adaptive Server Selection is enabled. - List<String> serverRankList = new ArrayList<>(); List<String> candidateServers = fetchCandidateServersForQuery(segments, segmentStates); // Fetch serverRankList before looping through all the segments. This is important to make sure that we pick // the least amount of instances for a query by referring to a single snapshot of the rankings. List<Pair<String, Double>> serverRankListWithScores = _adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers); - for (Pair<String, Double> entry : serverRankListWithScores) { - serverRankList.add(entry.getLeft()); + Map<String, Integer> serverRankMap = new HashMap<>(); + for (int idx = 0; idx < serverRankListWithScores.size(); idx++) { + Pair<String, Double> entry = serverRankListWithScores.get(idx); + serverRankMap.put(entry.getLeft(), idx); } - return selectServersUsingAdaptiveServerSelector(segments, requestId, segmentStates, serverRankList); + return selectServersUsingAdaptiveServerSelector(segments, requestId, segmentStates, serverRankMap); } else { // Adaptive Server Selection is NOT enabled. return selectServersUsingRoundRobin(segments, requestId, segmentStates, queryOptions); @@ -135,7 +137,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } private Pair<Map<String, String>, Map<String, String>> selectServersUsingAdaptiveServerSelector(List<String> segments, - int requestId, SegmentStates segmentStates, List<String> serverRankList) { + int requestId, SegmentStates segmentStates, Map<String, Integer> serverRankMap) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); @@ -146,26 +148,20 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { if (candidates == null) { continue; } - // Round Robin. - int numCandidates = candidates.size(); - int instanceIdx = requestId % numCandidates; - SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx); - // Adaptive Server Selection - // TODO: Support numReplicaGroupsToQuery with Adaptive Server Selection. - if (!serverRankList.isEmpty()) { - int minIdx = Integer.MAX_VALUE; - for (SegmentInstanceCandidate candidate : candidates) { - int idx = serverRankList.indexOf(candidate.getInstance()); - if (idx == -1) { - // Let's use the round-robin approach until stats for all servers are populated. - selectedInstance = candidates.get(instanceIdx); - break; - } - if (idx < minIdx) { - minIdx = idx; - selectedInstance = candidate; - } - } + + // Round Robin selection + int roundRobinInstanceIdx = requestId % candidates.size(); + SegmentInstanceCandidate selectedInstance = candidates.get(roundRobinInstanceIdx); + + // Adaptive Server Selection logic + if (!serverRankMap.isEmpty()) { + // Use instance with the best rank if all servers have stats populated, if not use round-robin selected instance + selectedInstance = candidates.stream() + .anyMatch(candidate -> !serverRankMap.containsKey(candidate.getInstance())) + ? candidates.get(roundRobinInstanceIdx) + : candidates.stream() + .min(Comparator.comparingInt(candidate -> serverRankMap.get(candidate.getInstance()))) + .orElse(candidates.get(roundRobinInstanceIdx)); } // This can only be offline when it is a new segment. And such segment is marked as optional segment so that // broker or server can skip it upon any issue to process it. diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index cc4f355369..daec569fb7 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -36,11 +36,13 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.broker.routing.adaptiveserverselector.HybridSelector; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -1957,4 +1959,62 @@ public class InstanceSelectorTest { assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } + + @Test + public void testReplicaGroupAdaptiveServerSelector() { + // Arrange + String offlineTableName = "testTable_OFFLINE"; + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); + HybridSelector hybridSelector = mock(HybridSelector.class); + ReplicaGroupInstanceSelector instanceSelector = new ReplicaGroupInstanceSelector( + offlineTableName, propertyStore, brokerMetrics, hybridSelector, Clock.systemUTC(), false, 300); + + // Define instances and segments + String instance0 = "instance0"; + String instance1 = "instance1"; + String instance2 = "instance2"; + String instance3 = "instance3"; + String instance4 = "instance4"; + String segment0 = "segment0"; + String segment1 = "segment1"; + String segment2 = "segment2"; + List<String> segments = Arrays.asList(segment0, segment1, segment2); + + // Define candidates for each segment + Map<String, List<SegmentInstanceCandidate>> instanceCandidatesMap = new HashMap<>(); + // segment0 -> instance0, instance1 + instanceCandidatesMap.put(segment0, Arrays.asList(new SegmentInstanceCandidate(instance0, true), + new SegmentInstanceCandidate(instance1, true))); + // segment1 -> instance2, instance3 + instanceCandidatesMap.put(segment1, Arrays.asList(new SegmentInstanceCandidate(instance2, true), + new SegmentInstanceCandidate(instance3, true))); + // segment2 -> instance3, instance4 // instance4 is not in the hybrid selector's server ranking + instanceCandidatesMap.put(segment2, Arrays.asList(new SegmentInstanceCandidate(instance4, true), + new SegmentInstanceCandidate(instance3, true))); + + // Define the segment states + SegmentStates segmentStates = new SegmentStates(instanceCandidatesMap, new HashSet<>(segments), null); + + // Define server rankings + List<Pair<String, Double>> serverRanks = Arrays.asList( + new ImmutablePair<>(instance3, 1.0), + new ImmutablePair<>(instance2, 2.0), + new ImmutablePair<>(instance1, 3.0), + new ImmutablePair<>(instance0, 4.0) + ); + when(hybridSelector.fetchServerRankingsWithScores(any())).thenReturn(serverRanks); + + // Act + Pair<Map<String, String>, Map<String, String>> selectedResult = instanceSelector.select(segments, 0, + segmentStates, null); + + // Assert + Map<String, String> expectedSelection = new HashMap<>(); + expectedSelection.put(segment0, instance1); + expectedSelection.put(segment1, instance3); + expectedSelection.put(segment2, instance4); + + assertEquals(selectedResult.getLeft(), expectedSelection); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org