This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 93585ebfb7 Improving unit tests for MultiStageReplicaGroupSelector 
(#15970)
93585ebfb7 is described below

commit 93585ebfb7fc8f8449be2c3792a04095c20be245
Author: Shaurya Chaturvedi <shauryach...@gmail.com>
AuthorDate: Fri Jun 13 07:42:30 2025 -0700

    Improving unit tests for MultiStageReplicaGroupSelector (#15970)
---
 .../instanceselector/InstanceSelectorTest.java     | 222 ----------------
 .../MultiStageReplicaGroupSelectorTest.java        | 294 +++++++++++++++++++++
 2 files changed, 294 insertions(+), 222 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index d319252685..318bfea776 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -43,7 +43,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.HybridSelector;
-import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -76,7 +75,6 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 
 @SuppressWarnings("unchecked")
@@ -987,226 +985,6 @@ public class InstanceSelectorTest {
     assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
   }
 
-  @Test
-  public void testMultiStageStrictReplicaGroupSelector() {
-    String offlineTableName = "testTable_OFFLINE";
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
-    // Create instance-partitions with two replica-groups and 1 partition. 
Each replica-group has 2 instances.
-    List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
-    List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
-    Map<String, List<String>> partitionToInstances = ImmutableMap.of("0_0", 
replicaGroup0, "0_1", replicaGroup1);
-    InstancePartitions instancePartitions = new 
InstancePartitions(offlineTableName);
-    instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
-    instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
-    BrokerRequest brokerRequest = mock(BrokerRequest.class);
-    PinotQuery pinotQuery = mock(PinotQuery.class);
-    Map<String, String> queryOptions = new HashMap<>();
-    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
-    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
-
-    MultiStageReplicaGroupSelector multiStageSelector =
-        new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false, 300);
-    multiStageSelector = spy(multiStageSelector);
-    
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
-
-    List<String> enabledInstances = new ArrayList<>();
-    IdealState idealState = new IdealState(offlineTableName);
-    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
-    ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
-    Set<String> onlineSegments = new HashSet<>();
-
-    // Mark all instances as enabled
-    for (int i = 0; i < 4; i++) {
-      enabledInstances.add(String.format("instance-%d", i));
-    }
-
-    List<String> segments = getSegments();
-
-    // Create two idealState and externalView maps. One is used for segments 
with replica-group=0 and the other for rg=1
-    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
-    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
-    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
-    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
-
-    // instance-0 and instance-2 mirror each other in the two replica-groups. 
Same for instance-1 and instance-3.
-    for (int i = 0; i < 4; i++) {
-      String instance = enabledInstances.get(i);
-      if (i % 2 == 0) {
-        idealStateInstanceStateMap0.put(instance, ONLINE);
-        externalViewInstanceStateMap0.put(instance, ONLINE);
-      } else {
-        idealStateInstanceStateMap1.put(instance, ONLINE);
-        externalViewInstanceStateMap1.put(instance, ONLINE);
-      }
-    }
-
-    // Even numbered segments get assigned to [instance-0, instance-2], and 
odd numbered segments get assigned to
-    // [instance-1,instance-3].
-    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
-      String segment = segments.get(segmentNum);
-      if (segmentNum % 2 == 0) {
-        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0);
-        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap0);
-      } else {
-        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1);
-        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap1);
-      }
-      onlineSegments.add(segment);
-    }
-
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
-        onlineSegments);
-
-    // Using requestId=0 should select replica-group 0. Even segments get 
assigned to instance-0 and odd segments get
-    // assigned to instance-1.
-    Map<String, String> expectedReplicaGroupInstanceSelectorResult = new 
HashMap<>();
-    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
-      expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
replicaGroup0.get(segmentNum % 2));
-    }
-    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(brokerRequest, segments, 0);
-    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
-
-    // Using same requestId again should return the same selection
-    selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
-    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
-
-    // Using requestId=1 should select replica-group 1
-    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
-    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
-      expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
replicaGroup1.get(segmentNum % 2));
-    }
-    selectionResult = multiStageSelector.select(brokerRequest, segments, 1);
-    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
-
-    // If instance-0 is down, replica-group 1 should be picked even with 
requestId=0
-    enabledInstances.remove("instance-0");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
-        onlineSegments);
-    selectionResult = multiStageSelector.select(brokerRequest, segments, 0);
-    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
-
-    // If instance-2 also goes down, no replica-group is eligible
-    enabledInstances.remove("instance-2");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
-        onlineSegments);
-    try {
-      multiStageSelector.select(brokerRequest, segments, 0);
-      fail("Method call above should have failed");
-    } catch (Exception ignored) {
-    }
-  }
-
-  @Test
-  public void testMultiStageStrictReplicaGroupSelectorForSomeErrorSegments() {
-    String offlineTableName = "testTable_OFFLINE";
-    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
-    BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
-
-    // Create instance-partitions with two replica-groups and 2 partitions. 
Each replica-group has 2 instances.
-    Map<String, List<String>> partitionToInstances = ImmutableMap.of(
-      "0_0", ImmutableList.of("instance-0"),
-      "0_1", ImmutableList.of("instance-2"),
-      "1_0", ImmutableList.of("instance-1"),
-      "1_1", ImmutableList.of("instance-3"));
-    InstancePartitions instancePartitions = new 
InstancePartitions(offlineTableName);
-    instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
-    instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
-    instancePartitions.setInstances(1, 0, partitionToInstances.get("1_0"));
-    instancePartitions.setInstances(1, 1, partitionToInstances.get("1_1"));
-
-    BrokerRequest brokerRequest = mock(BrokerRequest.class);
-    PinotQuery pinotQuery = mock(PinotQuery.class);
-    Map<String, String> queryOptions = new HashMap<>();
-    when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
-    when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
-
-    MultiStageReplicaGroupSelector multiStageSelector =
-        new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
-            false, 300);
-    multiStageSelector = spy(multiStageSelector);
-    
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
-
-    List<String> enabledInstances = new ArrayList<>();
-    IdealState idealState = new IdealState(offlineTableName);
-    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
-    ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
-    Set<String> onlineSegments = new HashSet<>();
-
-    // Mark all instances as enabled
-    for (int i = 0; i < 4; i++) {
-      enabledInstances.add(String.format("instance-%d", i));
-    }
-
-    List<String> segments = getSegments();
-
-    // Create two idealState and externalView maps. One is used for segments 
with replica-group=0 and the other for rg=1
-    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
-    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
-    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
-    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
-
-    // instance-0 and instance-2 mirror each other in the two replica-groups. 
Same for instance-1 and instance-3.
-    for (int i = 0; i < 4; i++) {
-      String instance = enabledInstances.get(i);
-      if (i % 2 == 0) {
-        idealStateInstanceStateMap0.put(instance, ONLINE);
-        externalViewInstanceStateMap0.put(instance, ONLINE);
-      } else {
-        idealStateInstanceStateMap1.put(instance, ONLINE);
-        externalViewInstanceStateMap1.put(instance, ONLINE);
-      }
-    }
-
-    // Even numbered segments get assigned to [instance-0, instance-2], and 
odd numbered segments get assigned to
-    // [instance-1,instance-3].
-    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
-      String segment = segments.get(segmentNum);
-      if (segmentNum % 2 == 0) {
-        idealStateSegmentAssignment.put(segment, new 
HashMap<>(idealStateInstanceStateMap0));
-        externalViewSegmentAssignment.put(segment, new 
HashMap<>(externalViewInstanceStateMap0));
-      } else {
-        idealStateSegmentAssignment.put(segment, new 
HashMap<>(idealStateInstanceStateMap1));
-        externalViewSegmentAssignment.put(segment, new 
HashMap<>(externalViewInstanceStateMap1));
-      }
-      onlineSegments.add(segment);
-    }
-
-    // Set one segment in each replica group to ERROR state.
-    externalViewSegmentAssignment.get("segment1").put("instance-1", "ERROR");
-    externalViewSegmentAssignment.get("segment2").put("instance-2", "ERROR");
-
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
-        onlineSegments);
-
-    // Even though instance-0 and instance-3 belong to different replica 
groups, they handle exclusive sets of segments
-    // and hence they can together serve all segments.
-    Map<String, String> expectedReplicaGroupInstanceSelectorResult = new 
HashMap<>();
-    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
-      if (segmentNum % 2 == 0) {
-        
expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
"instance-0");
-      } else {
-        
expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
"instance-3");
-      }
-    }
-    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(brokerRequest, segments, 0);
-    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedReplicaGroupInstanceSelectorResult);
-
-    // If instance-3 has an error segment as well, there is no replica group 
available to serve complete set of
-    // segments.
-    externalViewSegmentAssignment.get("segment3").put("instance-3", "ERROR");
-    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
-        onlineSegments);
-    try {
-      multiStageSelector.select(brokerRequest, segments, 0);
-      fail("Method call above should have failed");
-    } catch (Exception ignored) {
-    }
-  }
-
   @Test
   public void testUnavailableSegments() {
     String offlineTableName = "testTable_OFFLINE";
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
new file mode 100644
index 0000000000..8f72a7982e
--- /dev/null
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+
+public class MultiStageReplicaGroupSelectorTest {
+  private static final String TABLE_NAME = "testTable_OFFLINE";
+  private final static List<String> SEGMENTS =
+      Arrays.asList("segment0", "segment1", "segment2", "segment3", 
"segment4", "segment5", "segment6", "segment7",
+          "segment8", "segment9", "segment10", "segment11");
+  private static final Map<String, ServerInstance> EMPTY_SERVER_MAP = 
Collections.EMPTY_MAP;
+  private AutoCloseable _mocks;
+  @Mock
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  @Mock
+  private BrokerMetrics _brokerMetrics;
+  @Mock
+  private BrokerRequest _brokerRequest;
+  @Mock
+  private PinotQuery _pinotQuery;
+
+  private static List<String> getSegments() {
+    return SEGMENTS;
+  }
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    when(_brokerRequest.getPinotQuery()).thenReturn(_pinotQuery);
+    when(_pinotQuery.getQueryOptions()).thenReturn(null);
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testBasicReplicaGroupSelection() {
+    // Create instance-partitions with two replica-groups and 1 partition. 
Each replica-group has 2 instances.
+    List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
+    List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
+    InstancePartitions instancePartitions = 
createInstancePartitions(replicaGroup0, replicaGroup1);
+    MultiStageReplicaGroupSelector multiStageSelector = 
createMultiStageSelector(instancePartitions);
+
+    List<String> enabledInstances = createEnabledInstances(4);
+    IdealState idealState = new IdealState(TABLE_NAME);
+    ExternalView externalView = new ExternalView(TABLE_NAME);
+    Set<String> onlineSegments = new HashSet<>();
+
+    setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+
+    // Using requestId=0 should select replica-group 0. Even segments get 
assigned to instance-0 and odd segments get
+    // assigned to instance-1.
+    Map<String, String> expectedSelectorResult = 
createExpectedAssignment(replicaGroup0, getSegments());
+    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(_brokerRequest, getSegments(), 0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedSelectorResult);
+
+    // Using same requestId again should return the same selection
+    selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 
0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedSelectorResult);
+
+    // Using requestId=1 should select replica-group 1
+    expectedSelectorResult = createExpectedAssignment(replicaGroup1, 
getSegments());
+    selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 
1);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedSelectorResult);
+  }
+
+  @Test
+  public void testInstanceFailureHandling() {
+    // Create instance-partitions with two replica-groups and 1 partition. 
Each replica-group has 2 instances.
+    List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1");
+    List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3");
+    InstancePartitions instancePartitions = 
createInstancePartitions(replicaGroup0, replicaGroup1);
+    MultiStageReplicaGroupSelector multiStageSelector = 
createMultiStageSelector(instancePartitions);
+
+    List<String> enabledInstances = createEnabledInstances(4);
+    IdealState idealState = new IdealState(TABLE_NAME);
+    ExternalView externalView = new ExternalView(TABLE_NAME);
+    Set<String> onlineSegments = new HashSet<>();
+
+    setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+
+    // If instance-0 is down, replica-group 1 should be picked even with 
requestId=0
+    enabledInstances.remove("instance-0");
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+    Map<String, String> expectedSelectorResult = 
createExpectedAssignment(replicaGroup1, getSegments());
+    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(_brokerRequest, getSegments(), 0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedSelectorResult);
+
+    // If instance-2 also goes down, no replica-group is eligible
+    enabledInstances.remove("instance-2");
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+    try {
+      multiStageSelector.select(_brokerRequest, getSegments(), 0);
+      fail("Method call above should have failed");
+    } catch (Exception ignored) {
+    }
+  }
+
+  @Test
+  public void testErrorSegmentHandling() {
+    // Create instance-partitions with two replica-groups and 2 partitions. 
Each replica-group has 2 instances.
+    Map<String, List<String>> partitionToInstances = ImmutableMap.of(
+        "0_0", ImmutableList.of("instance-0"),
+        "0_1", ImmutableList.of("instance-2"),
+        "1_0", ImmutableList.of("instance-1"),
+        "1_1", ImmutableList.of("instance-3"));
+    InstancePartitions instancePartitions = new InstancePartitions(TABLE_NAME);
+    instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
+    instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
+    instancePartitions.setInstances(1, 0, partitionToInstances.get("1_0"));
+    instancePartitions.setInstances(1, 1, partitionToInstances.get("1_1"));
+
+    MultiStageReplicaGroupSelector multiStageSelector = 
createMultiStageSelector(instancePartitions);
+
+    List<String> enabledInstances = createEnabledInstances(4);
+    IdealState idealState = new IdealState(TABLE_NAME);
+    ExternalView externalView = new ExternalView(TABLE_NAME);
+    Set<String> onlineSegments = new HashSet<>();
+
+    setupBasicTestEnvironment(enabledInstances, idealState, externalView, 
onlineSegments);
+
+    // Set one segment in each replica group to ERROR state.
+    externalView.getRecord().getMapFields().get("segment1").put("instance-1", 
"ERROR");
+    externalView.getRecord().getMapFields().get("segment2").put("instance-2", 
"ERROR");
+
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+
+    // Even though instance-0 and instance-3 belong to different replica 
groups, they handle exclusive sets of segments
+    // and hence they can together serve all segments.
+    Map<String, String> expectedSelectorResult = new HashMap<>();
+    for (int segmentNum = 0; segmentNum < getSegments().size(); segmentNum++) {
+      if (segmentNum % 2 == 0) {
+        expectedSelectorResult.put(getSegments().get(segmentNum), 
"instance-0");
+      } else {
+        expectedSelectorResult.put(getSegments().get(segmentNum), 
"instance-3");
+      }
+    }
+    InstanceSelector.SelectionResult selectionResult = 
multiStageSelector.select(_brokerRequest, getSegments(), 0);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), 
expectedSelectorResult);
+
+    // If instance-3 has an error segment as well, there is no replica group 
available to serve complete set of
+    // segments.
+    externalView.getRecord().getMapFields().get("segment3").put("instance-3", 
"ERROR");
+    multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, 
idealState, externalView,
+        onlineSegments);
+    try {
+      multiStageSelector.select(_brokerRequest, getSegments(), 0);
+      fail("Method call above should have failed");
+    } catch (Exception ignored) {
+    }
+  }
+
+  private MultiStageReplicaGroupSelector 
createMultiStageSelector(InstancePartitions instancePartitions) {
+    MultiStageReplicaGroupSelector multiStageSelector =
+        new MultiStageReplicaGroupSelector(TABLE_NAME, _propertyStore, 
_brokerMetrics, null, Clock.systemUTC(),
+            false, 300);
+    multiStageSelector = spy(multiStageSelector);
+    
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
+    return multiStageSelector;
+  }
+
+  private InstancePartitions createInstancePartitions(List<String> 
replicaGroup0, List<String> replicaGroup1) {
+    Map<String, List<String>> partitionToInstances = ImmutableMap.of("0_0", 
replicaGroup0, "0_1", replicaGroup1);
+    InstancePartitions instancePartitions = new InstancePartitions(TABLE_NAME);
+    instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0"));
+    instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1"));
+    return instancePartitions;
+  }
+
+  private void setupInstanceStates(List<String> enabledInstances, Map<String, 
String> idealStateInstanceStateMap0,
+      Map<String, String> externalViewInstanceStateMap0, Map<String, String> 
idealStateInstanceStateMap1,
+      Map<String, String> externalViewInstanceStateMap1) {
+    for (int i = 0; i < enabledInstances.size(); i++) {
+      String instance = enabledInstances.get(i);
+      if (i % 2 == 0) {
+        idealStateInstanceStateMap0.put(instance, ONLINE);
+        externalViewInstanceStateMap0.put(instance, ONLINE);
+      } else {
+        idealStateInstanceStateMap1.put(instance, ONLINE);
+        externalViewInstanceStateMap1.put(instance, ONLINE);
+      }
+    }
+  }
+
+  private void setupSegmentAssignments(List<String> segments,
+      Map<String, Map<String, String>> idealStateSegmentAssignment,
+      Map<String, Map<String, String>> externalViewSegmentAssignment, 
Map<String, String> idealStateInstanceStateMap0,
+      Map<String, String> externalViewInstanceStateMap0, Map<String, String> 
idealStateInstanceStateMap1,
+      Map<String, String> externalViewInstanceStateMap1, Set<String> 
onlineSegments) {
+    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+      String segment = segments.get(segmentNum);
+      if (segmentNum % 2 == 0) {
+        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0);
+        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap0);
+      } else {
+        idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1);
+        externalViewSegmentAssignment.put(segment, 
externalViewInstanceStateMap1);
+      }
+      onlineSegments.add(segment);
+    }
+  }
+
+  private Map<String, String> createExpectedAssignment(List<String> 
replicaGroup, List<String> segments) {
+    Map<String, String> expectedReplicaGroupInstanceSelectorResult = new 
HashMap<>();
+    for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+      expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), 
replicaGroup.get(segmentNum % 2));
+    }
+    return expectedReplicaGroupInstanceSelectorResult;
+  }
+
+  private void setupBasicTestEnvironment(List<String> enabledInstances, 
IdealState idealState,
+      ExternalView externalView, Set<String> onlineSegments) {
+    Map<String, Map<String, String>> idealStateSegmentAssignment = 
idealState.getRecord().getMapFields();
+    Map<String, Map<String, String>> externalViewSegmentAssignment = 
externalView.getRecord().getMapFields();
+
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+
+    setupInstanceStates(enabledInstances, idealStateInstanceStateMap0, 
externalViewInstanceStateMap0,
+        idealStateInstanceStateMap1, externalViewInstanceStateMap1);
+    setupSegmentAssignments(getSegments(), idealStateSegmentAssignment, 
externalViewSegmentAssignment,
+        idealStateInstanceStateMap0, externalViewInstanceStateMap0, 
idealStateInstanceStateMap1,
+        externalViewInstanceStateMap1, onlineSegments);
+  }
+
+  private List<String> createEnabledInstances(int count) {
+    List<String> enabledInstances = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      enabledInstances.add(String.format("instance-%d", i));
+    }
+    return enabledInstances;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to