This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8ab0e49b2e Add a New Pre-check Item for Replica Groups Info (#15575) 8ab0e49b2e is described below commit 8ab0e49b2e828cbde77a16f4b09b5b6833121ef4 Author: Jhow <44998515+j-howhu...@users.noreply.github.com> AuthorDate: Thu Apr 24 14:18:54 2025 -0700 Add a New Pre-check Item for Replica Groups Info (#15575) --- .../core/rebalance/DefaultRebalancePreChecker.java | 62 +- .../TableRebalancerClusterStatelessTest.java | 52 +- .../tests/HybridClusterIntegrationTest.java | 7 +- .../tests/OfflineClusterIntegrationTest.java | 77 +- .../tests/TableRebalanceIntegrationTest.java | 821 +++++++++++++++++++++ 5 files changed, 1002 insertions(+), 17 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index ef5801d109..7ef7d53c00 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -39,8 +39,10 @@ import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.controller.validation.ResourceUtilizationInfo; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.spi.utils.Enablement; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; @@ -53,6 +55,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { public static final String DISK_UTILIZATION_DURING_REBALANCE = "diskUtilizationDuringRebalance"; public static final String DISK_UTILIZATION_AFTER_REBALANCE = "diskUtilizationAfterRebalance"; public static final String REBALANCE_CONFIG_OPTIONS = "rebalanceConfigOptions"; + public static final String REPLICA_GROUPS_INFO = "replicaGroupsInfo"; private static double _diskUtilizationThreshold; @@ -99,6 +102,8 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { preCheckResult.put(REBALANCE_CONFIG_OPTIONS, checkRebalanceConfig(rebalanceConfig, tableConfig, preCheckContext.getCurrentAssignment(), preCheckContext.getTargetAssignment())); + preCheckResult.put(REPLICA_GROUPS_INFO, checkReplicaGroups(tableConfig, rebalanceConfig)); + tableRebalanceLogger.info("End pre-checks"); return preCheckResult; } @@ -198,7 +203,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { : RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); } return RebalancePreCheckerResult.warn( - "minimizeDataMovement is not enabled for CONSUMING segments but instance assignment is allowed"); + "minimizeDataMovement is not enabled for CONSUMING segments, but instance assignment is allowed"); } return RebalancePreCheckerResult.pass("Instance assignment not allowed, no need for minimizeDataMovement"); } @@ -378,6 +383,61 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { : RebalancePreCheckerResult.warn(StringUtil.join("\n", warnings.toArray(String[]::new))); } + private RebalancePreCheckerResult checkReplicaGroups(TableConfig tableConfig, RebalanceConfig rebalanceConfig) { + String message; + boolean hasAnyReplicaGroup; + if (tableConfig.getTableType() == TableType.OFFLINE) { + message = "OFFLINE segments - " + getReplicaGroupInfo(tableConfig, InstancePartitionsType.OFFLINE.toString()); + hasAnyReplicaGroup = isReplicaGroupEnabled(tableConfig, InstancePartitionsType.OFFLINE.toString()); + } else { + // for realtime table + message = + "COMPLETED segments - " + getReplicaGroupInfo(tableConfig, InstancePartitionsType.COMPLETED.toString()) + "\n" + + "CONSUMING segments - " + getReplicaGroupInfo(tableConfig, InstancePartitionsType.CONSUMING.toString()); + hasAnyReplicaGroup = + isReplicaGroupEnabled(tableConfig, InstancePartitionsType.COMPLETED.toString()) || isReplicaGroupEnabled( + tableConfig, InstancePartitionsType.CONSUMING.toString()); + } + String tierMessage = ""; + if (tableConfig.getTierConfigsList() != null) { + List<String> tierMessageList = new ArrayList<>(); + for (TierConfig tierConfig : tableConfig.getTierConfigsList()) { + tierMessageList.add(tierConfig.getName() + " tier - " + getReplicaGroupInfo(tableConfig, tierConfig.getName())); + hasAnyReplicaGroup |= isReplicaGroupEnabled(tableConfig, tierConfig.getName()); + } + tierMessage = "\n" + StringUtil.join("\n", tierMessageList.toArray(String[]::new)); + } + if (hasAnyReplicaGroup && !rebalanceConfig.isReassignInstances()) { + return RebalancePreCheckerResult.warn( + "reassignInstances is disabled, replica groups may not be updated.\n" + message + tierMessage); + } + return RebalancePreCheckerResult.pass(message + tierMessage); + } + + private static boolean isReplicaGroupEnabled(TableConfig tableConfig, String typeOrTier) { + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); + return instanceAssignmentConfigMap != null && instanceAssignmentConfigMap.containsKey(typeOrTier) + && instanceAssignmentConfigMap.get(typeOrTier).getReplicaGroupPartitionConfig().isReplicaGroupBased(); + } + + private static String getReplicaGroupInfo(TableConfig tableConfig, String typeOrTier) { + if (!isReplicaGroupEnabled(tableConfig, typeOrTier)) { + return "Replica Groups are not enabled, replication: " + tableConfig.getReplication(); + } + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + instanceAssignmentConfigMap.get(typeOrTier).getReplicaGroupPartitionConfig(); + + int numReplicaGroups = instanceReplicaGroupPartitionConfig.getNumReplicaGroups(); + int numInstancePerReplicaGroup = instanceReplicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + if (numInstancePerReplicaGroup == 0) { + return "numReplicaGroups: " + numReplicaGroups + + ", numInstancesPerReplicaGroup: 0 (using as many instances as possible)"; + } + return "numReplicaGroups: " + numReplicaGroups + + ", numInstancesPerReplicaGroup: " + numInstancePerReplicaGroup; + } + private DiskUsageInfo getDiskUsageInfoOfInstance(String instanceId) { // This method currently depends on the controller's periodic task that fetches disk utilization of all instances // every 5 minutes by default. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index 1141c80081..49710aa1e9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -265,17 +265,19 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); rebalanceConfig.setPreChecks(true); + rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); Map<String, RebalancePreCheckerResult> preCheckResult = rebalanceResult.getPreChecksResult(); assertNotNull(preCheckResult); - assertEquals(preCheckResult.size(), 5); + assertEquals(preCheckResult.size(), 6); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); + assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)); // Sending request to servers should fail for all, so needsPreprocess should be set to "error" to indicate that a // manual check is needed assertEquals(preCheckResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(), @@ -302,6 +304,10 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { RebalancePreCheckerResult.PreCheckStatus.PASS); assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), "All rebalance parameters look good"); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); // All servers should be assigned to the table instanceAssignment = rebalanceResult.getInstanceAssignment(); @@ -390,8 +396,16 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // No need to reassign instances because instances should be automatically assigned when updating the table config rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + // Though instance partition map is set in ZK, the pre-checker is unaware of that, a warning will be thrown + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + NUM_REPLICAS + ", numInstancesPerReplicaGroup: 0 (using as many instances as possible)"); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -479,8 +493,14 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // no movement should occur rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -512,9 +532,15 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Try dry-run summary mode with reassignment rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -1146,7 +1172,11 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { Map<String, Map<String, String>> oldSegmentAssignment = _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields(); - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); + DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); + ExecutorService executorService = Executors.newFixedThreadPool(10); + preChecker.init(_helixResourceManager, executorService, 1); + TableRebalancer tableRebalancer = + new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); // Try dry-run summary mode RebalanceConfig rebalanceConfig = new RebalanceConfig(); @@ -1225,8 +1255,16 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Try dry-run summary mode rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS + "\n" + TIER_A_NAME + + " tier - Replica Groups are not enabled, replication: " + NUM_REPLICAS); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceResult.getRebalanceSummaryResult()); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -1287,8 +1325,17 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { // Try dry-run summary mode rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setReassignInstances(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + assertEquals( + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + "OFFLINE segments - Replica Groups are not enabled, replication: " + NUM_REPLICAS + "\n" + TIER_A_NAME + + " tier - numReplicaGroups: " + NUM_REPLICAS + + ", numInstancesPerReplicaGroup: 0 (using as many instances as possible)"); rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); assertNotNull(rebalanceSummaryResult); assertNotNull(rebalanceSummaryResult.getServerInfo()); @@ -1418,6 +1465,7 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { 6); _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME); + executorService.shutdown(); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index b948293c08..845d04970e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -58,6 +58,9 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet private static final String TENANT_NAME = "TestTenant"; private static final int NUM_OFFLINE_SEGMENTS = 8; private static final int NUM_REALTIME_SEGMENTS = 6; + protected static final int NUM_SERVERS_OFFLINE = 1; + protected static final int NUM_SERVERS_REALTIME = 1; + protected static final int NUM_SERVERS = NUM_SERVERS_OFFLINE + NUM_SERVERS_REALTIME; @Override protected String getBrokerTenant() { @@ -138,11 +141,11 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet _helixManager.getConfigAccessor() .set(scope, CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM, Integer.toString(12)); startBroker(); - startServers(2); + startServers(NUM_SERVERS); startKafka(); // Create tenants - createServerTenant(TENANT_NAME, 1, 1); + createServerTenant(TENANT_NAME, NUM_SERVERS_OFFLINE, NUM_SERVERS_REALTIME); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index f7b8604c9b..8ee4159901 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -830,16 +830,26 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Instance assignment not allowed, no need for minimizeDataMovement", RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), RebalancePreCheckerResult.PreCheckStatus.PASS); // Enable minimizeDataMovement - tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap(true)); + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = createInstanceAssignmentConfigMap(true); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", - RebalancePreCheckerResult.PreCheckStatus.PASS); + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); // Override minimizeDataMovement rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); @@ -848,17 +858,29 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "minimizeDataMovement is enabled in table config but it's overridden with disabled", RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", - RebalancePreCheckerResult.PreCheckStatus.PASS); + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); // Use default minimizeDataMovement and disable it in table config - tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap(false)); + instanceAssignmentConfigMap = createInstanceAssignmentConfigMap(false); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, "minimizeDataMovement is not enabled but instance assignment is allowed", RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", - RebalancePreCheckerResult.PreCheckStatus.PASS); + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); // Undo minimizeDataMovement, update the table config to add a column to bloom filter rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); @@ -870,6 +892,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Instance assignment not allowed, no need for minimizeDataMovement", RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), RebalancePreCheckerResult.PreCheckStatus.PASS); // Undo tableConfig change @@ -880,6 +904,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Instance assignment not allowed, no need for minimizeDataMovement", RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), RebalancePreCheckerResult.PreCheckStatus.PASS); // Add a new server (to force change in instance assignment) and enable reassignInstances @@ -894,6 +920,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Instance assignment not allowed, no need for minimizeDataMovement", RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), RebalancePreCheckerResult.PreCheckStatus.PASS); rebalanceConfig.setReassignInstances(false); @@ -911,25 +939,41 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet "Instance assignment not allowed, no need for minimizeDataMovement", RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), RebalancePreCheckerResult.PreCheckStatus.PASS); // Keep schema change and update table config to add minimizeDataMovement - tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap(true)); + instanceAssignmentConfigMap = createInstanceAssignmentConfigMap(true); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", - RebalancePreCheckerResult.PreCheckStatus.PASS); + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); // Keep schema change and update table config to add instance config map with minimizeDataMovement = false - tableConfig.setInstanceAssignmentConfigMap(createInstanceAssignmentConfigMap(false)); + instanceAssignmentConfigMap = createInstanceAssignmentConfigMap(false); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", - RebalancePreCheckerResult.PreCheckStatus.PASS); + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); // Add a new server (to force change in instance assignment) and enable reassignInstances // Trigger rebalance config warning @@ -946,7 +990,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet RebalancePreCheckerResult.PreCheckStatus.WARN, "bestEfforts is enabled, only enable it if you know what you are doing\n" + "bootstrap is enabled which can cause a large amount of data movement, double check if this is " - + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN); + + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); // Disable dry-run rebalanceConfig.setBootstrap(false); @@ -954,6 +1000,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet rebalanceConfig.setDryRun(false); rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); assertNull(rebalanceResult.getPreChecksResult()); + // Expect FAILED: Pre-checks can only be enabled in dry-run mode, not triggering rebalance assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); // Stop the added server @@ -965,16 +1012,18 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, String expectedMinimizeDataMovement, RebalancePreCheckerResult.PreCheckStatus expectedMinimizeDataMovementStatus, String expectedNeedsReloadMessage, RebalancePreCheckerResult.PreCheckStatus expectedNeedsReloadStatus, - String expectedRebalanceConfig, RebalancePreCheckerResult.PreCheckStatus expectedRebalanceConfigStatus) { + String expectedRebalanceConfig, RebalancePreCheckerResult.PreCheckStatus expectedRebalanceConfigStatus, + String expectedReplicaGroupMessage, RebalancePreCheckerResult.PreCheckStatus expectedReplicaGroupStatus) { assertEquals(rebalanceResult.getStatus(), expectedStatus); Map<String, RebalancePreCheckerResult> preChecksResult = rebalanceResult.getPreChecksResult(); assertNotNull(preChecksResult); - assertEquals(preChecksResult.size(), 5); + assertEquals(preChecksResult.size(), 6); assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)); assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(), expectedMinimizeDataMovementStatus); assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), @@ -987,6 +1036,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet expectedRebalanceConfigStatus); assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), expectedRebalanceConfig); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + expectedReplicaGroupStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + expectedReplicaGroupMessage); // As the disk utilization check periodic task was disabled in the test controller (ControllerConf // .RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY was set to 30000s, see org.apache.pinot.controller.helix // .ControllerTest.getDefaultControllerConfiguration), server's disk util should be unavailable on all servers if diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java new file mode 100644 index 0000000000..ca2c022b12 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java @@ -0,0 +1,821 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.common.utils.regex.JavaUtilPattern; +import org.apache.pinot.common.utils.regex.Matcher; +import org.apache.pinot.common.utils.regex.Pattern; +import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse; +import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResult; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TagOverrideConfig; +import org.apache.pinot.spi.config.table.TenantConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.Enablement; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +public class TableRebalanceIntegrationTest extends HybridClusterIntegrationTest { + private static String getQueryString(RebalanceConfig rebalanceConfig) { + return "dryRun=" + rebalanceConfig.isDryRun() + "&preChecks=" + rebalanceConfig.isPreChecks() + + "&reassignInstances=" + rebalanceConfig.isReassignInstances() + + "&includeConsuming=" + rebalanceConfig.isIncludeConsuming() + + "&minimizeDataMovement=" + rebalanceConfig.getMinimizeDataMovement() + + "&bootstrap=" + rebalanceConfig.isBootstrap() + "&downtime=" + rebalanceConfig.isDowntime() + + "&minAvailableReplicas=" + rebalanceConfig.getMinAvailableReplicas() + + "&bestEfforts=" + rebalanceConfig.isBestEfforts() + + "&externalViewCheckIntervalInMs=" + rebalanceConfig.getExternalViewCheckIntervalInMs() + + "&externalViewStabilizationTimeoutInMs=" + rebalanceConfig.getExternalViewStabilizationTimeoutInMs() + + "&updateTargetTier=" + rebalanceConfig.isUpdateTargetTier() + + "&heartbeatIntervalInMs=" + rebalanceConfig.getHeartbeatIntervalInMs() + + "&heartbeatTimeoutInMs=" + rebalanceConfig.getHeartbeatTimeoutInMs() + + "&maxAttempts=" + rebalanceConfig.getMaxAttempts() + + "&retryInitialDelayInMs=" + rebalanceConfig.getRetryInitialDelayInMs(); + } + + private String getRebalanceUrl(RebalanceConfig rebalanceConfig, TableType tableType) { + return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(), "tables", getTableName(), "rebalance") + + "?type=" + tableType.toString() + "&" + getQueryString(rebalanceConfig); + } + + @Test + public void testRealtimeRebalancePreCheckMinimizeDataMovement() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getRealtimeTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure pre-check status is null if not enabled + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setIncludeConsuming(true); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Use MinimizeDataMovementOptions.DEFAULT and disable it in table config for COMPLETED ONLY + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for COMPLETED segments, but instance assignment is allowed", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for COMPLETED segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DEFAULT and disable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for CONSUMING segments, but instance assignment is allowed", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DISABLE and enable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Set minimizeDataMovement to true and false respectively for COMPLETED and CONSUMING segments + instanceAssignmentConfigMap = new HashMap<>(); + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for either or both COMPLETED and CONSUMING segments, but instance " + + "assignment is allowed for both", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.ENABLE and MinimizeDataMovementOptions.DEFAULT in + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for both COMPLETED and CONSUMING segments in table config but it's " + + "overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for either or both COMPLETED and CONSUMING segments, but instance " + + "assignment is allowed for both", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DISABLE and enable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(), getServerTenant(), + new TagOverrideConfig(null, TagNameUtils.getRealtimeTagForTenant(getServerTenant()))); + tableConfig.setTenantConfig(tenantConfig); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + updateTableConfig(originalTableConfig); + } + + @Test + public void testRealtimeRebalancePreChecks() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getRealtimeTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure pre-check status is null if not enabled + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + + // Enable pre-checks, nothing is set + rebalanceConfig.setPreChecks(true); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "includeConsuming is disabled for a realtime table.", + RebalancePreCheckerResult.PreCheckStatus.WARN, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Enable minimizeDataMovement, enable replica group only for COMPLETED segments + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + rebalanceConfig.setIncludeConsuming(true); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Undo minimizeDataMovement, update the table config to add a column to bloom filter + rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE); + tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter"); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Undo tableConfig change + tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter"); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Validate that the status for reload is still PASS (i.e. even though an extra server is tagged which has no + // segments assigned for this table, we don't try to get needReload status from that extra server, otherwise + // ERROR status would be returned) + BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS); + createServerTenant(getServerTenant(), 0, 1); + rebalanceConfig.setReassignInstances(true); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + rebalanceConfig.setReassignInstances(false); + + // Stop the added server + serverStarter0.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter0.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + + // Add a schema change. Notice that this may affect other following tests + Schema schema = getSchema(getTableName()); + schema.addField(new MetricFieldSpec("NewAddedIntMetricB", FieldSpec.DataType.INT, 1)); + updateSchema(schema); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Keep schema change and update table config to add minimizeDataMovement + instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Keep schema change and update table config to add instance config map with minimizeDataMovement = false + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Trigger rebalance config warning + BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1); + createServerTenant(getServerTenant(), 0, 1); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setBestEfforts(true); + rebalanceConfig.setBootstrap(true); + rebalanceConfig.setMinAvailableReplicas(-1); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, + "bestEfforts is enabled, only enable it if you know what you are doing\n" + + "bootstrap is enabled which can cause a large amount of data movement, double check if this is " + + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + response = + sendPostRequest(getControllerRequestURLBuilder().forTableReload(getTableName(), TableType.REALTIME, false)); + waitForReloadToComplete(getReloadJobIdFromResponse(response), 30_000); + + rebalanceConfig.setBestEfforts(false); + rebalanceConfig.setBootstrap(false); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Stop the added server + serverStarter1.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter1.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + updateTableConfig(originalTableConfig); + } + + private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, + String expectedMinimizeDataMovement, RebalancePreCheckerResult.PreCheckStatus expectedMinimizeDataMovementStatus, + String expectedNeedsReloadMessage, RebalancePreCheckerResult.PreCheckStatus expectedNeedsReloadStatus, + String expectedRebalanceConfig, RebalancePreCheckerResult.PreCheckStatus expectedRebalanceConfigStatus, + String expectedReplicaGroupMessage, RebalancePreCheckerResult.PreCheckStatus expectedReplicaGroupStatus) { + assertEquals(rebalanceResult.getStatus(), expectedStatus); + Map<String, RebalancePreCheckerResult> preChecksResult = rebalanceResult.getPreChecksResult(); + assertNotNull(preChecksResult); + assertEquals(preChecksResult.size(), 6); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(), + expectedMinimizeDataMovementStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + expectedMinimizeDataMovement); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(), + expectedNeedsReloadStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(), + expectedNeedsReloadMessage); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(), + expectedRebalanceConfigStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), + expectedRebalanceConfig); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + expectedReplicaGroupStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + expectedReplicaGroupMessage); + // As the disk utilization check periodic task was disabled in the test controller (ControllerConf + // .RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY was set to 30000s, see org.apache.pinot.controller.helix + // .ControllerTest.getDefaultControllerConfiguration), server's disk util should be unavailable on all servers if + // not explicitly set via org.apache.pinot.controller.validation.ResourceUtilizationInfo.setDiskUsageInfo + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + } + + private InstanceAssignmentConfig createInstanceAssignmentConfig(boolean minimizeDataMovement, TableType tableType) { + InstanceTagPoolConfig instanceTagPoolConfig = + new InstanceTagPoolConfig(TagNameUtils.getServerTagForTenant(getServerTenant(), tableType), false, 1, null); + List<String> constraints = new ArrayList<>(); + constraints.add("constraints1"); + InstanceConstraintConfig instanceConstraintConfig = new InstanceConstraintConfig(constraints); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 1, 1, + 1, 1, 1, minimizeDataMovement, + null); + return new InstanceAssignmentConfig(instanceTagPoolConfig, + instanceConstraintConfig, instanceReplicaGroupPartitionConfig, null, minimizeDataMovement); + } + + @Test + public void testRealtimeRebalanceDryRunSummary() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getRealtimeTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure summary status is non-null always + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNotNull(rebalanceResult.getRebalanceSummaryResult()); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, false, NUM_SERVERS_REALTIME, + NUM_SERVERS_REALTIME, + tableConfig.getReplication(), true); + + updateTableConfig(originalTableConfig); + } + + private void checkRebalanceDryRunSummary(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, + boolean isSegmentsToBeMoved, int existingNumServers, int newNumServers, int replicationFactor, + boolean isRealtime) { + assertEquals(rebalanceResult.getStatus(), expectedStatus); + RebalanceSummaryResult summaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(summaryResult); + assertNotNull(summaryResult.getServerInfo()); + assertNotNull(summaryResult.getSegmentInfo()); + assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(), replicationFactor, + "Existing replication factor doesn't match expected"); + assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(), + summaryResult.getSegmentInfo().getReplicationFactor().getExpectedValueAfterRebalance(), + "Existing and new replication factor doesn't match"); + assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), existingNumServers, + "Existing number of servers don't match"); + assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), newNumServers, + "New number of servers don't match"); + // In this cluster integration test, servers are tagged with DefaultTenant only + assertEquals(summaryResult.getTagsInfo().size(), 1); + if (isRealtime) { + assertEquals(summaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getRealtimeTagForTenant(getServerTenant())); + } else { + assertEquals(summaryResult.getTagsInfo().get(0).getTagName(), + TagNameUtils.getOfflineTagForTenant(getServerTenant())); + } + assertEquals(summaryResult.getTagsInfo().get(0).getNumServerParticipants(), newNumServers); + assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), + summaryResult.getTagsInfo().get(0).getNumSegmentsToDownload()); + // For this single tenant, the number of unchanged segments and the number of received segments should add up to + // the total present segment + assertEquals(summaryResult.getSegmentInfo().getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(), + summaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged() + summaryResult.getTagsInfo() + .get(0) + .getNumSegmentsToDownload()); + long tableSize = 0; + try { + tableSize = getTableSize(getTableName()); + } catch (IOException e) { + Assert.fail("Failed to get table size", e); + } + if (tableSize > 0) { + assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes() > 0L, + "Avg segment size expected to be > 0 but found to be 0"); + } + assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), + summaryResult.getServerInfo().getServersGettingNewSegments().size()); + if (existingNumServers != newNumServers) { + assertTrue(summaryResult.getServerInfo().getNumServersGettingNewSegments() > 0, + "Expected number of servers should be > 0"); + } else { + assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), 0, + "Expected number of servers getting new segments should be 0"); + } + + if (isSegmentsToBeMoved) { + assertTrue(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() > 0, + "Segments to be moved should be > 0"); + assertTrue(summaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted() > 0, + "Segments to be moved should be > 0"); + assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(), + summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() + * summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes(), + "Estimated data to be moved in bytes doesn't match"); + assertTrue(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer() > 0, + "Estimated max number of segments to move in a single server should be > 0"); + } else { + assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0, "Segments to be moved should be 0"); + assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(), 0L, + "Estimated data to be moved in bytes should be 0"); + assertEquals(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer(), 0, + "Estimated max number of segments to move in a single server should be 0"); + } + + // Validate server status stats with numServers information + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = + summaryResult.getServerInfo().getServerSegmentChangeInfo(); + int numServersAdded = 0; + int numServersRemoved = 0; + int numServersUnchanged = 0; + for (RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChangeInfo : serverSegmentChangeInfoMap.values()) { + switch (serverSegmentChangeInfo.getServerStatus()) { + case UNCHANGED: + numServersUnchanged++; + break; + case ADDED: + numServersAdded++; + break; + case REMOVED: + numServersRemoved++; + break; + default: + Assert.fail(String.format("Unknown server status encountered: %s", + serverSegmentChangeInfo.getServerStatus())); + break; + } + } + if (isRealtime) { + Assert.assertNotNull(summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary()); + } + + Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), + numServersRemoved + numServersUnchanged); + Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), + numServersAdded + numServersUnchanged); + + assertEquals(numServersAdded, summaryResult.getServerInfo().getServersAdded().size()); + assertEquals(numServersRemoved, summaryResult.getServerInfo().getServersRemoved().size()); + assertEquals(numServersUnchanged, summaryResult.getServerInfo().getServersUnchanged().size()); + } + + private String getReloadJobIdFromResponse(String response) { + Pattern pattern = new JavaUtilPattern("([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + Matcher matcher = pattern.matcher(response); + String jobId = matcher.find() ? matcher.group(1) : null; + if (jobId == null) { + return ""; + } + return jobId; + } + + private void waitForReloadToComplete(String reloadJobId, long timeoutMs) { + TestUtils.waitForCondition(aVoid -> { + try { + String requestUrl = getControllerRequestURLBuilder().forSegmentReloadStatus(reloadJobId); + SimpleHttpResponse httpResponse = + HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new URL(requestUrl).toURI(), null)); + ServerReloadControllerJobStatusResponse reloadResult = + JsonUtils.stringToObject(httpResponse.getResponse(), ServerReloadControllerJobStatusResponse.class); + return reloadResult.getEstimatedTimeRemainingInMinutes() == 0.0; + } catch (Exception e) { + return null; + } + }, 1000L, timeoutMs, "Failed to reload all segments"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org