This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2f2ea415407 Replace Helix AutoRebalanceStrategy with deterministic algorithm (#16135) 2f2ea415407 is described below commit 2f2ea4154071a0e42b26f62bf8db93c04833fe9f Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Jun 18 12:52:53 2025 -0600 Replace Helix AutoRebalanceStrategy with deterministic algorithm (#16135) --- .../assignment/segment/SegmentAssignmentUtils.java | 83 +++-- .../BalancedNumSegmentAssignmentStrategy.java | 11 +- .../segment/SegmentAssignmentUtilsTest.java | 26 +- .../TableRebalancerClusterStatelessTest.java | 350 +++++++++------------ 4 files changed, 222 insertions(+), 248 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 4e5aec1b60e..027f8defba9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -24,14 +24,12 @@ import it.unimi.dsi.fastutil.ints.IntIntPair; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; import org.apache.helix.HelixManager; -import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; @@ -140,25 +138,72 @@ public class SegmentAssignmentUtils { return instancesAssigned; } - /** - * Rebalances the table with Helix AutoRebalanceStrategy. - */ - public static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStrategy( - Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) { - // Use Helix AutoRebalanceStrategy to rebalance the table - LinkedHashMap<String, Integer> states = new LinkedHashMap<>(); - states.put(SegmentStateModel.ONLINE, replication); - AutoRebalanceStrategy autoRebalanceStrategy = - new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states); - // Make a copy of the current assignment because this step might change the passed in assignment - Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>(); + /// Rebalances the table with non-replica-group based segment assignment strategy by uniformly spraying segment + /// replicas to the servers. + /// 1. Calculate the target number of segments on each server + /// 2. Loop over all the segments and keep the assignment if target number of segments for the server has not been + /// reached and track the not assigned segments + /// 3. Assign the left-over segments to the servers with the least segments, or the smallest index if there is a tie + public static Map<String, Map<String, String>> rebalanceNonReplicaGroupBasedTable( + Map<String, Map<String, String>> currentAssignment, List<String> servers, int replication) { + Map<String, Integer> serverIds = getInstanceNameToIdMap(servers); + + // Calculate target number of segments per server + // NOTE: in order to minimize the segment movements, use the ceiling of the quotient + int numServers = servers.size(); + int numSegments = currentAssignment.size(); + int targetNumSegmentsPerServer = (numSegments * replication + numServers - 1) / numServers; + + // Do not move segment if target number of segments is not reached, track the segments need to be moved + Map<String, Map<String, String>> newAssignment = new TreeMap<>(); + int[] numSegmentsAssignedPerServer = new int[numServers]; + List<String> segmentsNotAssigned = new ArrayList<>(); for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> instanceStateMap = entry.getValue(); - currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap)); + String segment = entry.getKey(); + Set<String> currentServers = entry.getValue().keySet(); + int remainingReplicas = replication; + for (String server : currentServers) { + Integer serverId = serverIds.get(server); + if (serverId != null && numSegmentsAssignedPerServer[serverId] < targetNumSegmentsPerServer) { + newAssignment.computeIfAbsent(segment, k -> new TreeMap<>()).put(server, SegmentStateModel.ONLINE); + numSegmentsAssignedPerServer[serverId]++; + remainingReplicas--; + if (remainingReplicas == 0) { + break; + } + } + } + for (int i = 0; i < remainingReplicas; i++) { + segmentsNotAssigned.add(segment); + } + } + + // Assign each not assigned segment to the server with the least segments, or the smallest id if there is a tie + PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numServers, Pairs.intPairComparator()); + for (int serverId = 0; serverId < numServers; serverId++) { + heap.add(new Pairs.IntPair(numSegmentsAssignedPerServer[serverId], serverId)); + } + List<Pairs.IntPair> skippedPairs = new ArrayList<>(); + for (String segment : segmentsNotAssigned) { + Map<String, String> instanceStateMap = newAssignment.computeIfAbsent(segment, k -> new TreeMap<>()); + while (true) { + Pairs.IntPair intPair = heap.remove(); + int serverId = intPair.getRight(); + String server = servers.get(serverId); + // Skip the server if it already has the segment + if (instanceStateMap.put(server, SegmentStateModel.ONLINE) == null) { + intPair.setLeft(intPair.getLeft() + 1); + heap.add(intPair); + break; + } else { + skippedPairs.add(intPair); + } + } + heap.addAll(skippedPairs); + skippedPairs.clear(); } - return autoRebalanceStrategy.computePartitionAssignment(instances, instances, currentAssignmentCopy, null) - .getMapFields(); + + return newAssignment; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java index e9c540da78a..c1270705848 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java @@ -42,17 +42,15 @@ import org.slf4j.LoggerFactory; public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class); - private String _tableNameWithType; private int _replication; @Override public void init(HelixManager helixManager, TableConfig tableConfig) { - _tableNameWithType = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); _replication = tableConfig.getReplication(); - LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}", - _tableNameWithType, _replication); + LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: {} with replication: {}", + tableConfig.getTableName(), _replication); } @Override @@ -66,12 +64,9 @@ public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentSt public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) { validateSegmentAssignmentStrategy(instancePartitions); - Map<String, Map<String, String>> newAssignment; List<String> instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); - newAssignment = - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); - return newAssignment; + return SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, _replication); } private void validateSegmentAssignmentStrategy(InstancePartitions instancePartitions) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java index 0f43b7869df..306e391ce0c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java @@ -76,8 +76,7 @@ public class SegmentAssignmentUtilsTest { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals( - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, NUM_REPLICAS), + assertEquals(SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, NUM_REPLICAS), currentAssignment); // Replace instance_0 with instance_10 @@ -85,8 +84,7 @@ public class SegmentAssignmentUtilsTest { String newInstanceName = INSTANCE_NAME_PREFIX + 10; newInstances.set(0, newInstanceName); Map<String, Map<String, String>> newAssignment = - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(currentAssignment.size(), numSegments); // Each segment should have 3 replicas @@ -116,8 +114,8 @@ public class SegmentAssignmentUtilsTest { // } int newNumInstances = numInstances - 5; newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas @@ -127,19 +125,19 @@ public class SegmentAssignmentUtilsTest { // The segments are not perfectly balanced, but should be deterministic numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); - assertEquals(numSegmentsAssignedPerInstance[0], 56); + assertEquals(numSegmentsAssignedPerInstance[0], 60); assertEquals(numSegmentsAssignedPerInstance[1], 60); assertEquals(numSegmentsAssignedPerInstance[2], 60); assertEquals(numSegmentsAssignedPerInstance[3], 60); - assertEquals(numSegmentsAssignedPerInstance[4], 64); + assertEquals(numSegmentsAssignedPerInstance[4], 60); numSegmentsToMovePerInstance = SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, newAssignment); assertEquals(numSegmentsToMovePerInstance.size(), numInstances); - assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), IntIntPair.of(26, 0)); + assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3), IntIntPair.of(30, 0)); - assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), IntIntPair.of(34, 0)); + assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), IntIntPair.of(30, 0)); for (int i = 5; i < 10; i++) { assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), IntIntPair.of(0, 30)); } @@ -150,8 +148,8 @@ public class SegmentAssignmentUtilsTest { // } newNumInstances = numInstances + 5; newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas @@ -182,8 +180,8 @@ public class SegmentAssignmentUtilsTest { // } String newInstanceNamePrefix = "i_"; newInstances = SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 64b8a6e5f5d..f4964021ea9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -99,7 +99,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); } - /// /// Dropping instance from cluster requires waiting for live instance gone and removing instance related ZNodes, which /// are not the purpose of the test, so combine different rebalance scenarios into one test: /// 1. NO_OP rebalance @@ -107,7 +106,6 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { /// 3. Migrate to replica-group based segment assignment and rebalance /// 4. Migrate back to non-replica-group based segment assignment and rebalance /// 5. Remove (disable) servers and rebalance - /// @Test public void testRebalance() throws Exception { @@ -119,16 +117,14 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { for (int i = 0; i < numServers; i++) { String instanceId = SERVER_INSTANCE_ID_PREFIX + i; addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo1 = - new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); - diskUsageInfoMap.put(instanceId, diskUsageInfo1); + DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); + diskUsageInfoMap.put(instanceId, diskUsageInfo); } ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 1); - TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -175,8 +171,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); @@ -211,8 +206,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { for (int i = 0; i < numServersToAdd; i++) { String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i); addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo = - new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); + DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); diskUsageInfoMap.put(instanceId, diskUsageInfo); } @@ -227,18 +221,17 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 14); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 14); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 15); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 15); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 14); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 15); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS - 14); + numSegments * NUM_REPLICAS - 15); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); @@ -251,19 +244,19 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Original servers should be losing some segments String newServer = SERVER_INSTANCE_ID_PREFIX + i; RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); - assertTrue(serverSegmentChange.getSegmentsDeleted() > 0); - assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); - assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); - assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0); + assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 10); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); assertEquals(serverSegmentChange.getSegmentsAdded(), 0); + assertEquals(serverSegmentChange.getSegmentsDeleted(), 5); + assertEquals(serverSegmentChange.getSegmentsUnchanged(), 5); } for (int i = 0; i < numServersToAdd; i++) { // New servers should only get new segments String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i); RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); - assertTrue(serverSegmentChange.getSegmentsAdded() > 0); assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0); - assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), serverSegmentChange.getSegmentsAdded()); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); + assertEquals(serverSegmentChange.getSegmentsAdded(), 5); assertEquals(serverSegmentChange.getSegmentsDeleted(), 0); assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0); } @@ -301,16 +294,14 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { "Instance assignment not allowed, no need for minimizeDataMovement"); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) - .getMessage() - .startsWith("Within threshold")); + assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) + .getMessage() + .startsWith("Within threshold")); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) - .getMessage() - .startsWith("Within threshold")); + assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) + .getMessage() + .startsWith("Within threshold")); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), @@ -336,17 +327,17 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { Map<String, IntIntPair> instanceToNumSegmentsToMoveMap = SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment, newSegmentAssignment); assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers + numServersToAdd); - for (int i = 0; i < numServersToAdd; i++) { - IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); - assertNotNull(numSegmentsToMove); - assertTrue(numSegmentsToMove.leftInt() > 0); - assertEquals(numSegmentsToMove.rightInt(), 0); - } for (int i = 0; i < numServers; i++) { IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i); assertNotNull(numSegmentsToMove); assertEquals(numSegmentsToMove.leftInt(), 0); - assertTrue(numSegmentsToMove.rightInt() > 0); + assertEquals(numSegmentsToMove.rightInt(), 5); + } + for (int i = 0; i < numServersToAdd; i++) { + IntIntPair numSegmentsToMove = instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers + i)); + assertNotNull(numSegmentsToMove); + assertEquals(numSegmentsToMove.leftInt(), 5); + assertEquals(numSegmentsToMove.rightInt(), 0); } // Dry-run mode should not change the IdealState @@ -416,25 +407,25 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals( rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) - .getMessage(), "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments " - + "- numReplicaGroups: " + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many instances as " - + "possible)"); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments " + + "- numReplicaGroups: " + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many instances as " + + "possible)"); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 11); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 11); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 20); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 20); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 11); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 20); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS - 11); + numSegments * NUM_REPLICAS - 20); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); @@ -445,11 +436,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { for (int i = 0; i < numServers + numServersToAdd; i++) { String newServer = SERVER_INSTANCE_ID_PREFIX + i; RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 5); assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); - // Ensure not all segments moved - assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); - // Ensure all segments has something assigned prior to rebalance - assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); } // Dry-run mode should not change the IdealState @@ -515,8 +503,9 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals( rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) - .getMessage(), "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -528,11 +517,9 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); @@ -557,8 +544,9 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals( rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO) - .getMessage(), "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -571,11 +559,9 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers + numServersToAdd); assertNotNull(rebalanceResult.getInstanceAssignment()); @@ -628,13 +614,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); assertNotNull(rebalanceSummaryResult.getTagsInfo()); assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), - TagNameUtils.getOfflineTagForTenant(null)); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(null)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 15); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 15); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -701,10 +685,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { preChecker.init(_helixResourceManager, executorService, 1); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Set up the table with 1 replication factor and strict replica group enabled - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1) - .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, - false)).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNumReplicas(1) + .setRoutingConfig( + new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -783,20 +768,17 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, false, null); - InstanceAssignmentConfig instanceAssignmentConfig = - new InstanceAssignmentConfig( - new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, null), null, - replicaGroupPartitionConfig, - InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setNumReplicas(numReplicas) - .setRoutingConfig( - new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) - .setStreamConfigs( - FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap()) - .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) - .build(); + InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig( + new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0, null), null, + replicaGroupPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(numReplicas) + .setRoutingConfig( + new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap()) + .setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig)) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -983,10 +965,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { preChecker.init(_helixResourceManager, executorService, 1); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Set up the table with 1 replication factor and strict replica group enabled - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1) - .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, - false)).build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setNumReplicas(1) + .setRoutingConfig( + new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -1031,16 +1014,14 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { for (int i = 0; i < numServers; i++) { String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + i; addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo1 = - new DiskUsageInfo(instanceId, "", 1000L, 200L, System.currentTimeMillis()); + DiskUsageInfo diskUsageInfo1 = new DiskUsageInfo(instanceId, "", 1000L, 200L, System.currentTimeMillis()); diskUsageInfoMap.put(instanceId, diskUsageInfo1); } ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 0.5); - TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); @@ -1062,8 +1043,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { for (int i = 0; i < numServersToAdd; i++) { String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + (numServers + i); addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo = - new DiskUsageInfo(instanceId, "", 1000L, 200L, System.currentTimeMillis()); + DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 200L, System.currentTimeMillis()); diskUsageInfoMap.put(instanceId, diskUsageInfo); } @@ -1081,22 +1061,19 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) - .getMessage() - .startsWith("Within threshold")); + assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE) + .getMessage() + .startsWith("Within threshold")); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); - assertTrue( - preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) - .getMessage() - .startsWith("Within threshold")); + assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE) + .getMessage() + .startsWith("Within threshold")); for (int i = 0; i < numServers + numServersToAdd; i++) { String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX + i; - DiskUsageInfo diskUsageInfo = - new DiskUsageInfo(instanceId, "", 1000L, 755L, System.currentTimeMillis()); + DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L, 755L, System.currentTimeMillis()); diskUsageInfoMap.put(instanceId, diskUsageInfo); } @@ -1139,13 +1116,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); preChecker.init(_helixResourceManager, executorService, 0.5); - TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setNumReplicas(2) - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .build(); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(2) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -1187,13 +1162,12 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { rebalanceConfig.setUpdateTargetTier(false); rebalanceConfig.setBootstrap(false); rebalanceConfig.setBestEfforts(false); - tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setTierConfigList(Collections.singletonList( - new TierConfig("dummyTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null, - TierFactory.PINOT_SERVER_STORAGE_TYPE, - TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null, null))) - .build(); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setTierConfigList(Collections.singletonList( + new TierConfig("dummyTier", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null, + TierFactory.PINOT_SERVER_STORAGE_TYPE, + TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null, null))) + .build(); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS); @@ -1218,8 +1192,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { "Number of replicas (3) is greater than 1, downtime is not recommended."); // no downtime warning with 1 replica - newTableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build(); + newTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build(); rebalanceConfig.setDowntime(true); rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null); @@ -1312,8 +1285,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(preCheckerResult.getMessage(), "Number of segments to add to a single server (" + expectedNumSegmentsToAdd + ") is high (>" + DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is recommended to set batchSizePerServer to " - + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE - + " or lower to avoid excessive load on servers."); + + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE + " or lower to avoid excessive load on servers."); rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE); rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null); @@ -1330,13 +1302,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { executorService.shutdown(); } - /** - * Tests rebalance with tier configs - * Add 10 segments, with segment metadata end time 3 days apart starting from now to 30 days ago - * 1. run rebalance - should see no change - * 2. add nodes for tiers and run rebalance - should see no change - * 3. add tier config and run rebalance - should see changed assignment - */ + /// Tests rebalance with tier configs + /// Add 10 segments, with segment metadata end time 3 days apart starting from now to 30 days ago + /// 1. run rebalance - should see no change + /// 2. add nodes for tiers and run rebalance - should see no change + /// 3. add tier config and run rebalance - should see changed assignment @Test public void testRebalanceWithTiers() throws Exception { @@ -1387,10 +1357,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1428,10 +1396,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1554,8 +1520,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); ExecutorService executorService = Executors.newFixedThreadPool(10); preChecker.init(_helixResourceManager, executorService, 1); - TableRebalancer tableRebalancer = - new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, _tableSizeReader); // Try dry-run summary mode RebalanceConfig rebalanceConfig = new RebalanceConfig(); @@ -1575,10 +1540,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1612,10 +1575,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)); assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), - numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), - numServers); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); + assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); assertNotNull(rebalanceResult.getInstanceAssignment()); assertNotNull(rebalanceResult.getSegmentAssignment()); @@ -1662,14 +1623,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) .getNumSegmentsToDownload(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumSegmentsUnchanged(), - 0); + .getNumSegmentsUnchanged(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumServerParticipants(), - 0); + .getNumServerParticipants(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) - .getNumSegmentsToDownload(), - numSegments * NUM_REPLICAS); + .getNumSegmentsToDownload(), numSegments * NUM_REPLICAS); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) .getNumSegmentsUnchanged(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) @@ -1709,7 +1667,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { rebalanceConfig.setPreChecks(true); rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); assertEquals( rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); @@ -1721,7 +1679,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 13); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); assertNotNull(rebalanceSummaryResult.getTagsInfo()); @@ -1734,16 +1692,13 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) .getNumSegmentsToDownload(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumSegmentsUnchanged(), - 0); + .getNumSegmentsUnchanged(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumServerParticipants(), - 0); + .getNumServerParticipants(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) - .getNumSegmentsToDownload(), - 13); + .getNumSegmentsToDownload(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) - .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13); + .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) .getNumServerParticipants(), 6); assertNotNull(rebalanceResult.getInstanceAssignment()); @@ -1751,7 +1706,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertNotNull(rebalanceResult.getSegmentAssignment()); rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); - assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME)); InstancePartitions instancePartitions = rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME); @@ -1823,26 +1778,20 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) .getNumSegmentsToDownload(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumSegmentsUnchanged(), - 0); + .getNumSegmentsUnchanged(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + NO_TIER_NAME)) - .getNumServerParticipants(), - 0); + .getNumServerParticipants(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) - .getNumSegmentsToDownload(), - 0); + .getNumSegmentsToDownload(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) .getNumSegmentsUnchanged(), 0); assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment" + TIER_A_NAME)) .getNumServerParticipants(), 0); - assertEquals( - tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(), + assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(), 0); - assertEquals( - tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(), + assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); - assertEquals( - tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(), + assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(), 6); _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME); @@ -1852,7 +1801,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { @Test public void testRebalanceWithMinimizeDataMovementBalanced() throws Exception { - int numServers = 6; + int numServers = 3; for (int i = 0; i < numServers; i++) { addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" + SERVER_INSTANCE_ID_PREFIX + i, true); @@ -2111,13 +2060,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = Mockito.mock(ConsumingSegmentInfoReader.class); - TableRebalancer tableRebalancerOriginal = - new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setNumReplicas(numReplica) - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .build(); + TableRebalancer tableRebalancerOriginal = new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(numReplica) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -2149,11 +2096,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(), 0); assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(), 0); assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(), 0); - assertEquals(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() - .size(), 0); - assertTrue(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() + assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(), 0); + assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary() .values() .stream() .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() == 0)); @@ -2168,11 +2112,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(), 0); assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(), 0); assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(), 0); - assertEquals(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() - .size(), 0); - assertTrue(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() + assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(), 0); + assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary() .values() .stream() .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() == 0)); @@ -2203,11 +2144,8 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(), FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS); - assertEquals(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() - .size(), numServers); - assertTrue(consumingSegmentToBeMovedSummary - .getServerConsumingSegmentSummary() + assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(), numServers); + assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary() .values() .stream() .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() @@ -2231,13 +2169,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); } - TableRebalancer tableRebalancerOriginal = - new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); - TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) - .setNumReplicas(numReplica) - .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) - .build(); + TableRebalancer tableRebalancerOriginal = new TableRebalancer(_helixManager, null, null, null, _tableSizeReader); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(numReplica) + .setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()) + .build(); // Create the table addDummySchema(RAW_TABLE_NAME); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org