This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 93585ebfb7 Improving unit tests for MultiStageReplicaGroupSelector (#15970) 93585ebfb7 is described below commit 93585ebfb7fc8f8449be2c3792a04095c20be245 Author: Shaurya Chaturvedi <shauryach...@gmail.com> AuthorDate: Fri Jun 13 07:42:30 2025 -0700 Improving unit tests for MultiStageReplicaGroupSelector (#15970) --- .../instanceselector/InstanceSelectorTest.java | 222 ---------------- .../MultiStageReplicaGroupSelectorTest.java | 294 +++++++++++++++++++++ 2 files changed, 294 insertions(+), 222 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index d319252685..318bfea776 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -43,7 +43,6 @@ import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.adaptiveserverselector.HybridSelector; -import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -76,7 +75,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; @SuppressWarnings("unchecked") @@ -987,226 +985,6 @@ public class InstanceSelectorTest { assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); } - @Test - public void testMultiStageStrictReplicaGroupSelector() { - String offlineTableName = "testTable_OFFLINE"; - ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); - // Create instance-partitions with two replica-groups and 1 partition. Each replica-group has 2 instances. - List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1"); - List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3"); - Map<String, List<String>> partitionToInstances = ImmutableMap.of("0_0", replicaGroup0, "0_1", replicaGroup1); - InstancePartitions instancePartitions = new InstancePartitions(offlineTableName); - instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0")); - instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1")); - BrokerRequest brokerRequest = mock(BrokerRequest.class); - PinotQuery pinotQuery = mock(PinotQuery.class); - Map<String, String> queryOptions = new HashMap<>(); - when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery); - when(pinotQuery.getQueryOptions()).thenReturn(queryOptions); - - MultiStageReplicaGroupSelector multiStageSelector = - new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false, 300); - multiStageSelector = spy(multiStageSelector); - doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions(); - - List<String> enabledInstances = new ArrayList<>(); - IdealState idealState = new IdealState(offlineTableName); - Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); - ExternalView externalView = new ExternalView(offlineTableName); - Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); - Set<String> onlineSegments = new HashSet<>(); - - // Mark all instances as enabled - for (int i = 0; i < 4; i++) { - enabledInstances.add(String.format("instance-%d", i)); - } - - List<String> segments = getSegments(); - - // Create two idealState and externalView maps. One is used for segments with replica-group=0 and the other for rg=1 - Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); - Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); - Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>(); - Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>(); - - // instance-0 and instance-2 mirror each other in the two replica-groups. Same for instance-1 and instance-3. - for (int i = 0; i < 4; i++) { - String instance = enabledInstances.get(i); - if (i % 2 == 0) { - idealStateInstanceStateMap0.put(instance, ONLINE); - externalViewInstanceStateMap0.put(instance, ONLINE); - } else { - idealStateInstanceStateMap1.put(instance, ONLINE); - externalViewInstanceStateMap1.put(instance, ONLINE); - } - } - - // Even numbered segments get assigned to [instance-0, instance-2], and odd numbered segments get assigned to - // [instance-1,instance-3]. - for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { - String segment = segments.get(segmentNum); - if (segmentNum % 2 == 0) { - idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0); - externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap0); - } else { - idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1); - externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap1); - } - onlineSegments.add(segment); - } - - multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, - onlineSegments); - - // Using requestId=0 should select replica-group 0. Even segments get assigned to instance-0 and odd segments get - // assigned to instance-1. - Map<String, String> expectedReplicaGroupInstanceSelectorResult = new HashMap<>(); - for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { - expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), replicaGroup0.get(segmentNum % 2)); - } - InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(brokerRequest, segments, 0); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); - - // Using same requestId again should return the same selection - selectionResult = multiStageSelector.select(brokerRequest, segments, 0); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); - - // Using requestId=1 should select replica-group 1 - expectedReplicaGroupInstanceSelectorResult = new HashMap<>(); - for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { - expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), replicaGroup1.get(segmentNum % 2)); - } - selectionResult = multiStageSelector.select(brokerRequest, segments, 1); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); - - // If instance-0 is down, replica-group 1 should be picked even with requestId=0 - enabledInstances.remove("instance-0"); - multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, - onlineSegments); - selectionResult = multiStageSelector.select(brokerRequest, segments, 0); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); - - // If instance-2 also goes down, no replica-group is eligible - enabledInstances.remove("instance-2"); - multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, - onlineSegments); - try { - multiStageSelector.select(brokerRequest, segments, 0); - fail("Method call above should have failed"); - } catch (Exception ignored) { - } - } - - @Test - public void testMultiStageStrictReplicaGroupSelectorForSomeErrorSegments() { - String offlineTableName = "testTable_OFFLINE"; - ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); - BrokerMetrics brokerMetrics = mock(BrokerMetrics.class); - - // Create instance-partitions with two replica-groups and 2 partitions. Each replica-group has 2 instances. - Map<String, List<String>> partitionToInstances = ImmutableMap.of( - "0_0", ImmutableList.of("instance-0"), - "0_1", ImmutableList.of("instance-2"), - "1_0", ImmutableList.of("instance-1"), - "1_1", ImmutableList.of("instance-3")); - InstancePartitions instancePartitions = new InstancePartitions(offlineTableName); - instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0")); - instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1")); - instancePartitions.setInstances(1, 0, partitionToInstances.get("1_0")); - instancePartitions.setInstances(1, 1, partitionToInstances.get("1_1")); - - BrokerRequest brokerRequest = mock(BrokerRequest.class); - PinotQuery pinotQuery = mock(PinotQuery.class); - Map<String, String> queryOptions = new HashMap<>(); - when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery); - when(pinotQuery.getQueryOptions()).thenReturn(queryOptions); - - MultiStageReplicaGroupSelector multiStageSelector = - new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), - false, 300); - multiStageSelector = spy(multiStageSelector); - doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions(); - - List<String> enabledInstances = new ArrayList<>(); - IdealState idealState = new IdealState(offlineTableName); - Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); - ExternalView externalView = new ExternalView(offlineTableName); - Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); - Set<String> onlineSegments = new HashSet<>(); - - // Mark all instances as enabled - for (int i = 0; i < 4; i++) { - enabledInstances.add(String.format("instance-%d", i)); - } - - List<String> segments = getSegments(); - - // Create two idealState and externalView maps. One is used for segments with replica-group=0 and the other for rg=1 - Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); - Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); - Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>(); - Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>(); - - // instance-0 and instance-2 mirror each other in the two replica-groups. Same for instance-1 and instance-3. - for (int i = 0; i < 4; i++) { - String instance = enabledInstances.get(i); - if (i % 2 == 0) { - idealStateInstanceStateMap0.put(instance, ONLINE); - externalViewInstanceStateMap0.put(instance, ONLINE); - } else { - idealStateInstanceStateMap1.put(instance, ONLINE); - externalViewInstanceStateMap1.put(instance, ONLINE); - } - } - - // Even numbered segments get assigned to [instance-0, instance-2], and odd numbered segments get assigned to - // [instance-1,instance-3]. - for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { - String segment = segments.get(segmentNum); - if (segmentNum % 2 == 0) { - idealStateSegmentAssignment.put(segment, new HashMap<>(idealStateInstanceStateMap0)); - externalViewSegmentAssignment.put(segment, new HashMap<>(externalViewInstanceStateMap0)); - } else { - idealStateSegmentAssignment.put(segment, new HashMap<>(idealStateInstanceStateMap1)); - externalViewSegmentAssignment.put(segment, new HashMap<>(externalViewInstanceStateMap1)); - } - onlineSegments.add(segment); - } - - // Set one segment in each replica group to ERROR state. - externalViewSegmentAssignment.get("segment1").put("instance-1", "ERROR"); - externalViewSegmentAssignment.get("segment2").put("instance-2", "ERROR"); - - multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, - onlineSegments); - - // Even though instance-0 and instance-3 belong to different replica groups, they handle exclusive sets of segments - // and hence they can together serve all segments. - Map<String, String> expectedReplicaGroupInstanceSelectorResult = new HashMap<>(); - for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { - if (segmentNum % 2 == 0) { - expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), "instance-0"); - } else { - expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), "instance-3"); - } - } - InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(brokerRequest, segments, 0); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult); - - // If instance-3 has an error segment as well, there is no replica group available to serve complete set of - // segments. - externalViewSegmentAssignment.get("segment3").put("instance-3", "ERROR"); - multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, - onlineSegments); - try { - multiStageSelector.select(brokerRequest, segments, 0); - fail("Method call above should have failed"); - } catch (Exception ignored) { - } - } - @Test public void testUnavailableSegments() { String offlineTableName = "testTable_OFFLINE"; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java new file mode 100644 index 0000000000..8f72a7982e --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.instanceselector; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.core.transport.ServerInstance; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + + +public class MultiStageReplicaGroupSelectorTest { + private static final String TABLE_NAME = "testTable_OFFLINE"; + private final static List<String> SEGMENTS = + Arrays.asList("segment0", "segment1", "segment2", "segment3", "segment4", "segment5", "segment6", "segment7", + "segment8", "segment9", "segment10", "segment11"); + private static final Map<String, ServerInstance> EMPTY_SERVER_MAP = Collections.EMPTY_MAP; + private AutoCloseable _mocks; + @Mock + private ZkHelixPropertyStore<ZNRecord> _propertyStore; + @Mock + private BrokerMetrics _brokerMetrics; + @Mock + private BrokerRequest _brokerRequest; + @Mock + private PinotQuery _pinotQuery; + + private static List<String> getSegments() { + return SEGMENTS; + } + + @BeforeMethod + public void setUp() { + _mocks = MockitoAnnotations.openMocks(this); + when(_brokerRequest.getPinotQuery()).thenReturn(_pinotQuery); + when(_pinotQuery.getQueryOptions()).thenReturn(null); + } + + @AfterMethod + public void tearDown() throws Exception { + _mocks.close(); + } + + @Test + public void testBasicReplicaGroupSelection() { + // Create instance-partitions with two replica-groups and 1 partition. Each replica-group has 2 instances. + List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1"); + List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3"); + InstancePartitions instancePartitions = createInstancePartitions(replicaGroup0, replicaGroup1); + MultiStageReplicaGroupSelector multiStageSelector = createMultiStageSelector(instancePartitions); + + List<String> enabledInstances = createEnabledInstances(4); + IdealState idealState = new IdealState(TABLE_NAME); + ExternalView externalView = new ExternalView(TABLE_NAME); + Set<String> onlineSegments = new HashSet<>(); + + setupBasicTestEnvironment(enabledInstances, idealState, externalView, onlineSegments); + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + + // Using requestId=0 should select replica-group 0. Even segments get assigned to instance-0 and odd segments get + // assigned to instance-1. + Map<String, String> expectedSelectorResult = createExpectedAssignment(replicaGroup0, getSegments()); + InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 0); + assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectorResult); + + // Using same requestId again should return the same selection + selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 0); + assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectorResult); + + // Using requestId=1 should select replica-group 1 + expectedSelectorResult = createExpectedAssignment(replicaGroup1, getSegments()); + selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 1); + assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectorResult); + } + + @Test + public void testInstanceFailureHandling() { + // Create instance-partitions with two replica-groups and 1 partition. Each replica-group has 2 instances. + List<String> replicaGroup0 = ImmutableList.of("instance-0", "instance-1"); + List<String> replicaGroup1 = ImmutableList.of("instance-2", "instance-3"); + InstancePartitions instancePartitions = createInstancePartitions(replicaGroup0, replicaGroup1); + MultiStageReplicaGroupSelector multiStageSelector = createMultiStageSelector(instancePartitions); + + List<String> enabledInstances = createEnabledInstances(4); + IdealState idealState = new IdealState(TABLE_NAME); + ExternalView externalView = new ExternalView(TABLE_NAME); + Set<String> onlineSegments = new HashSet<>(); + + setupBasicTestEnvironment(enabledInstances, idealState, externalView, onlineSegments); + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + + // If instance-0 is down, replica-group 1 should be picked even with requestId=0 + enabledInstances.remove("instance-0"); + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + Map<String, String> expectedSelectorResult = createExpectedAssignment(replicaGroup1, getSegments()); + InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 0); + assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectorResult); + + // If instance-2 also goes down, no replica-group is eligible + enabledInstances.remove("instance-2"); + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + try { + multiStageSelector.select(_brokerRequest, getSegments(), 0); + fail("Method call above should have failed"); + } catch (Exception ignored) { + } + } + + @Test + public void testErrorSegmentHandling() { + // Create instance-partitions with two replica-groups and 2 partitions. Each replica-group has 2 instances. + Map<String, List<String>> partitionToInstances = ImmutableMap.of( + "0_0", ImmutableList.of("instance-0"), + "0_1", ImmutableList.of("instance-2"), + "1_0", ImmutableList.of("instance-1"), + "1_1", ImmutableList.of("instance-3")); + InstancePartitions instancePartitions = new InstancePartitions(TABLE_NAME); + instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0")); + instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1")); + instancePartitions.setInstances(1, 0, partitionToInstances.get("1_0")); + instancePartitions.setInstances(1, 1, partitionToInstances.get("1_1")); + + MultiStageReplicaGroupSelector multiStageSelector = createMultiStageSelector(instancePartitions); + + List<String> enabledInstances = createEnabledInstances(4); + IdealState idealState = new IdealState(TABLE_NAME); + ExternalView externalView = new ExternalView(TABLE_NAME); + Set<String> onlineSegments = new HashSet<>(); + + setupBasicTestEnvironment(enabledInstances, idealState, externalView, onlineSegments); + + // Set one segment in each replica group to ERROR state. + externalView.getRecord().getMapFields().get("segment1").put("instance-1", "ERROR"); + externalView.getRecord().getMapFields().get("segment2").put("instance-2", "ERROR"); + + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + + // Even though instance-0 and instance-3 belong to different replica groups, they handle exclusive sets of segments + // and hence they can together serve all segments. + Map<String, String> expectedSelectorResult = new HashMap<>(); + for (int segmentNum = 0; segmentNum < getSegments().size(); segmentNum++) { + if (segmentNum % 2 == 0) { + expectedSelectorResult.put(getSegments().get(segmentNum), "instance-0"); + } else { + expectedSelectorResult.put(getSegments().get(segmentNum), "instance-3"); + } + } + InstanceSelector.SelectionResult selectionResult = multiStageSelector.select(_brokerRequest, getSegments(), 0); + assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectorResult); + + // If instance-3 has an error segment as well, there is no replica group available to serve complete set of + // segments. + externalView.getRecord().getMapFields().get("segment3").put("instance-3", "ERROR"); + multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP, idealState, externalView, + onlineSegments); + try { + multiStageSelector.select(_brokerRequest, getSegments(), 0); + fail("Method call above should have failed"); + } catch (Exception ignored) { + } + } + + private MultiStageReplicaGroupSelector createMultiStageSelector(InstancePartitions instancePartitions) { + MultiStageReplicaGroupSelector multiStageSelector = + new MultiStageReplicaGroupSelector(TABLE_NAME, _propertyStore, _brokerMetrics, null, Clock.systemUTC(), + false, 300); + multiStageSelector = spy(multiStageSelector); + doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions(); + return multiStageSelector; + } + + private InstancePartitions createInstancePartitions(List<String> replicaGroup0, List<String> replicaGroup1) { + Map<String, List<String>> partitionToInstances = ImmutableMap.of("0_0", replicaGroup0, "0_1", replicaGroup1); + InstancePartitions instancePartitions = new InstancePartitions(TABLE_NAME); + instancePartitions.setInstances(0, 0, partitionToInstances.get("0_0")); + instancePartitions.setInstances(0, 1, partitionToInstances.get("0_1")); + return instancePartitions; + } + + private void setupInstanceStates(List<String> enabledInstances, Map<String, String> idealStateInstanceStateMap0, + Map<String, String> externalViewInstanceStateMap0, Map<String, String> idealStateInstanceStateMap1, + Map<String, String> externalViewInstanceStateMap1) { + for (int i = 0; i < enabledInstances.size(); i++) { + String instance = enabledInstances.get(i); + if (i % 2 == 0) { + idealStateInstanceStateMap0.put(instance, ONLINE); + externalViewInstanceStateMap0.put(instance, ONLINE); + } else { + idealStateInstanceStateMap1.put(instance, ONLINE); + externalViewInstanceStateMap1.put(instance, ONLINE); + } + } + } + + private void setupSegmentAssignments(List<String> segments, + Map<String, Map<String, String>> idealStateSegmentAssignment, + Map<String, Map<String, String>> externalViewSegmentAssignment, Map<String, String> idealStateInstanceStateMap0, + Map<String, String> externalViewInstanceStateMap0, Map<String, String> idealStateInstanceStateMap1, + Map<String, String> externalViewInstanceStateMap1, Set<String> onlineSegments) { + for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { + String segment = segments.get(segmentNum); + if (segmentNum % 2 == 0) { + idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap0); + externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap0); + } else { + idealStateSegmentAssignment.put(segment, idealStateInstanceStateMap1); + externalViewSegmentAssignment.put(segment, externalViewInstanceStateMap1); + } + onlineSegments.add(segment); + } + } + + private Map<String, String> createExpectedAssignment(List<String> replicaGroup, List<String> segments) { + Map<String, String> expectedReplicaGroupInstanceSelectorResult = new HashMap<>(); + for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) { + expectedReplicaGroupInstanceSelectorResult.put(segments.get(segmentNum), replicaGroup.get(segmentNum % 2)); + } + return expectedReplicaGroupInstanceSelectorResult; + } + + private void setupBasicTestEnvironment(List<String> enabledInstances, IdealState idealState, + ExternalView externalView, Set<String> onlineSegments) { + Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields(); + Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); + + Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>(); + Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); + Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>(); + Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>(); + + setupInstanceStates(enabledInstances, idealStateInstanceStateMap0, externalViewInstanceStateMap0, + idealStateInstanceStateMap1, externalViewInstanceStateMap1); + setupSegmentAssignments(getSegments(), idealStateSegmentAssignment, externalViewSegmentAssignment, + idealStateInstanceStateMap0, externalViewInstanceStateMap0, idealStateInstanceStateMap1, + externalViewInstanceStateMap1, onlineSegments); + } + + private List<String> createEnabledInstances(int count) { + List<String> enabledInstances = new ArrayList<>(); + for (int i = 0; i < count; i++) { + enabledInstances.add(String.format("instance-%d", i)); + } + return enabledInstances; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org