This is an automated email from the ASF dual-hosted git repository.

jiaguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 604b244b64 Optimize Adaptive Server Selection (#13952)
604b244b64 is described below

commit 604b244b645dd033b2a1737e70dbca620aad42b0
Author: praveenc7 <praveenkchagan...@gmail.com>
AuthorDate: Wed Sep 11 17:24:40 2024 -0700

    Optimize Adaptive Server Selection (#13952)
    
    * Improve time complexity for adaptiveServer
    
    * Remove additional comments
    
    * Remove additional comments
    
    * review comments
    
    * move hashset
    
    * move hashset
    
    * remove hashset
    
    * functional unit test
    
    * add test
    
    * lint
    
    * fix round-robin condition
    
    * loop fix
---
 .../ReplicaGroupInstanceSelector.java              | 46 ++++++++---------
 .../instanceselector/InstanceSelectorTest.java     | 60 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 25 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index c766791f2d..374c5235f3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing.instanceselector;
 
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,17 +76,18 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       SegmentStates segmentStates, Map<String, String> queryOptions) {
     if (_adaptiveServerSelector != null) {
       // Adaptive Server Selection is enabled.
-      List<String> serverRankList = new ArrayList<>();
       List<String> candidateServers = fetchCandidateServersForQuery(segments, 
segmentStates);
 
       // Fetch serverRankList before looping through all the segments. This is 
important to make sure that we pick
       // the least amount of instances for a query by referring to a single 
snapshot of the rankings.
       List<Pair<String, Double>> serverRankListWithScores =
           
_adaptiveServerSelector.fetchServerRankingsWithScores(candidateServers);
-      for (Pair<String, Double> entry : serverRankListWithScores) {
-        serverRankList.add(entry.getLeft());
+      Map<String, Integer> serverRankMap = new HashMap<>();
+      for (int idx = 0; idx < serverRankListWithScores.size(); idx++) {
+        Pair<String, Double> entry = serverRankListWithScores.get(idx);
+        serverRankMap.put(entry.getLeft(), idx);
       }
-      return selectServersUsingAdaptiveServerSelector(segments, requestId, 
segmentStates, serverRankList);
+      return selectServersUsingAdaptiveServerSelector(segments, requestId, 
segmentStates, serverRankMap);
     } else {
       // Adaptive Server Selection is NOT enabled.
       return selectServersUsingRoundRobin(segments, requestId, segmentStates, 
queryOptions);
@@ -135,7 +137,7 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
   }
 
   private Pair<Map<String, String>, Map<String, String>> 
selectServersUsingAdaptiveServerSelector(List<String> segments,
-      int requestId, SegmentStates segmentStates, List<String> serverRankList) 
{
+      int requestId, SegmentStates segmentStates, Map<String, Integer> 
serverRankMap) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
     // No need to adjust this map per total segment numbers, as optional 
segments should be empty most of the time.
     Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
@@ -146,26 +148,20 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       if (candidates == null) {
         continue;
       }
-      // Round Robin.
-      int numCandidates = candidates.size();
-      int instanceIdx = requestId % numCandidates;
-      SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
-      // Adaptive Server Selection
-      // TODO: Support numReplicaGroupsToQuery with Adaptive Server Selection.
-      if (!serverRankList.isEmpty()) {
-        int minIdx = Integer.MAX_VALUE;
-        for (SegmentInstanceCandidate candidate : candidates) {
-          int idx = serverRankList.indexOf(candidate.getInstance());
-          if (idx == -1) {
-            // Let's use the round-robin approach until stats for all servers 
are populated.
-            selectedInstance = candidates.get(instanceIdx);
-            break;
-          }
-          if (idx < minIdx) {
-            minIdx = idx;
-            selectedInstance = candidate;
-          }
-        }
+
+      // Round Robin selection
+      int roundRobinInstanceIdx = requestId % candidates.size();
+      SegmentInstanceCandidate selectedInstance = 
candidates.get(roundRobinInstanceIdx);
+
+      // Adaptive Server Selection logic
+      if (!serverRankMap.isEmpty()) {
+        // Use instance with the best rank if all servers have stats 
populated, if not use round-robin selected instance
+        selectedInstance = candidates.stream()
+            .anyMatch(candidate -> 
!serverRankMap.containsKey(candidate.getInstance()))
+            ? candidates.get(roundRobinInstanceIdx)
+            : candidates.stream()
+                .min(Comparator.comparingInt(candidate -> 
serverRankMap.get(candidate.getInstance())))
+                .orElse(candidates.get(roundRobinInstanceIdx));
       }
       // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
       // broker or server can skip it upon any issue to process it.
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index cc4f355369..daec569fb7 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -36,11 +36,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.adaptiveserverselector.HybridSelector;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -1957,4 +1959,62 @@ public class InstanceSelectorTest {
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedBalancedInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
+
+  @Test
+  public void testReplicaGroupAdaptiveServerSelector() {
+    // Arrange
+    String offlineTableName = "testTable_OFFLINE";
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    HybridSelector hybridSelector = mock(HybridSelector.class);
+    ReplicaGroupInstanceSelector instanceSelector = new 
ReplicaGroupInstanceSelector(
+        offlineTableName, propertyStore, brokerMetrics, hybridSelector, 
Clock.systemUTC(), false, 300);
+
+    // Define instances and segments
+    String instance0 = "instance0";
+    String instance1 = "instance1";
+    String instance2 = "instance2";
+    String instance3 = "instance3";
+    String instance4 = "instance4";
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    String segment2 = "segment2";
+    List<String> segments = Arrays.asList(segment0, segment1, segment2);
+
+    // Define candidates for each segment
+    Map<String, List<SegmentInstanceCandidate>> instanceCandidatesMap = new 
HashMap<>();
+    // segment0 -> instance0, instance1
+    instanceCandidatesMap.put(segment0, Arrays.asList(new 
SegmentInstanceCandidate(instance0, true),
+        new SegmentInstanceCandidate(instance1, true)));
+    // segment1 -> instance2, instance3
+    instanceCandidatesMap.put(segment1, Arrays.asList(new 
SegmentInstanceCandidate(instance2, true),
+        new SegmentInstanceCandidate(instance3, true)));
+    // segment2 -> instance3, instance4 // instance4 is not in the hybrid 
selector's server ranking
+    instanceCandidatesMap.put(segment2, Arrays.asList(new 
SegmentInstanceCandidate(instance4, true),
+        new SegmentInstanceCandidate(instance3, true)));
+
+    // Define the segment states
+    SegmentStates segmentStates = new SegmentStates(instanceCandidatesMap, new 
HashSet<>(segments), null);
+
+    // Define server rankings
+    List<Pair<String, Double>> serverRanks = Arrays.asList(
+        new ImmutablePair<>(instance3, 1.0),
+        new ImmutablePair<>(instance2, 2.0),
+        new ImmutablePair<>(instance1, 3.0),
+        new ImmutablePair<>(instance0, 4.0)
+    );
+    
when(hybridSelector.fetchServerRankingsWithScores(any())).thenReturn(serverRanks);
+
+    // Act
+    Pair<Map<String, String>, Map<String, String>> selectedResult = 
instanceSelector.select(segments, 0,
+        segmentStates, null);
+
+    // Assert
+    Map<String, String> expectedSelection = new HashMap<>();
+    expectedSelection.put(segment0, instance1);
+    expectedSelection.put(segment1, instance3);
+    expectedSelection.put(segment2, instance4);
+
+    assertEquals(selectedResult.getLeft(), expectedSelection);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to