J-HowHuang commented on code in PR #15575: URL: https://github.com/apache/pinot/pull/15575#discussion_r2054914910
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java: ########## @@ -0,0 +1,1170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.common.utils.regex.JavaUtilPattern; +import org.apache.pinot.common.utils.regex.Matcher; +import org.apache.pinot.common.utils.regex.Pattern; +import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse; +import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse; +import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResult; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TagOverrideConfig; +import org.apache.pinot.spi.config.table.TenantConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + + +public class TableRebalanceIntegrationTest extends HybridClusterIntegrationTest { + private String getRebalanceUrl(RebalanceConfig rebalanceConfig, TableType tableType) { + return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(), "tables", getTableName(), "rebalance") + + "?type=" + tableType.toString() + "&" + rebalanceConfig.toQueryString(); + } + + @Test + public void testOfflineRebalancePreChecks() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getOfflineTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure pre-check status is null if not enabled + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + + // Enable pre-checks, nothing is set + rebalanceConfig.setPreChecks(true); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Enable minimizeDataMovement + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = + Collections.singletonMap("OFFLINE", createInstanceAssignmentConfig(true, TableType.OFFLINE)); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Override minimizeDataMovement + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use default minimizeDataMovement and disable it in table config + instanceAssignmentConfigMap = + Collections.singletonMap("OFFLINE", createInstanceAssignmentConfig(false, TableType.OFFLINE)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled but instance assignment is allowed", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Undo minimizeDataMovement, update the table config to add a column to bloom filter + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter"); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Undo tableConfig change + tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter"); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Validate that the status for reload is still PASS (i.e. even though an extra server is tagged which has no + // segments assigned for this table, we don't try to get needReload status from that extra server, otherwise + // ERROR status would be returned) + BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS); + createServerTenant(getServerTenant(), 1, 0); + rebalanceConfig.setReassignInstances(true); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + rebalanceConfig.setReassignInstances(false); + + // Stop the added server + serverStarter0.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter0.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + + // Add a schema change. Notice that this may affect other following tests + Schema schema = createSchema(); + schema.addField(new MetricFieldSpec("NewAddedIntMetricA", FieldSpec.DataType.INT, 1)); + updateSchema(schema); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Keep schema change and update table config to add minimizeDataMovement + instanceAssignmentConfigMap = + Collections.singletonMap("OFFLINE", createInstanceAssignmentConfig(true, TableType.OFFLINE)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Keep schema change and update table config to add instance config map with minimizeDataMovement = false + instanceAssignmentConfigMap = + Collections.singletonMap("OFFLINE", createInstanceAssignmentConfig(false, TableType.OFFLINE)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("OFFLINE").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nOFFLINE segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Trigger rebalance config warning + BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1); + createServerTenant(getServerTenant(), 1, 0); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setBestEfforts(true); + rebalanceConfig.setBootstrap(true); + rebalanceConfig.setMinAvailableReplicas(-1); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, + "bestEfforts is enabled, only enable it if you know what you are doing\n" + + "bootstrap is enabled which can cause a large amount of data movement, double check if this is " + + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // reload + response = + sendPostRequest(getControllerRequestURLBuilder().forTableReload(getTableName(), TableType.OFFLINE, true)); + waitForReloadToComplete(getReloadJobIdFromResponse(response), 30_000); + // reload realtime table as well for other realtime tests + response = + sendPostRequest(getControllerRequestURLBuilder().forTableReload(getTableName(), TableType.REALTIME, false)); + waitForReloadToComplete(getReloadJobIdFromResponse(response), 30_000); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "bestEfforts is enabled, only enable it if you know what you are doing\n" + + "bootstrap is enabled which can cause a large amount of data movement, double check if this is " + + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN, + "OFFLINE segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Disable dry-run + rebalanceConfig.setBootstrap(false); + rebalanceConfig.setBestEfforts(false); + rebalanceConfig.setDryRun(false); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.OFFLINE)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + + // Stop the added server + serverStarter1.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter1.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + updateTableConfig(originalTableConfig); + } + + @Test + public void testRealtimeRebalancePreCheckMinimizeDataMovement() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getRealtimeTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure pre-check status is null if not enabled + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setIncludeConsuming(true); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Use MinimizeDataMovementOptions.DEFAULT and disable it in table config for COMPLETED ONLY + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for COMPLETED segments, but instance assignment is allowed", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for COMPLETED segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DEFAULT and disable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for CONSUMING segments, but instance assignment is allowed", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DISABLE and enable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Set minimizeDataMovement to true and false respectively for COMPLETED and CONSUMING segments + instanceAssignmentConfigMap = new HashMap<>(); + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for either or both COMPLETED and CONSUMING segments, but instance " + + "assignment is allowed for both", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.DISABLE and MinimizeDataMovementOptions.DEFAULT in + // this case + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + // Use MinimizeDataMovementOptions.ENABLE + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // response will be the same for MinimizeDataMovementOptions.ENABLE and MinimizeDataMovementOptions.DEFAULT in + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + assertEquals(JsonUtils.stringToObject(response, RebalanceResult.class) + .getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage()); + + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for both COMPLETED and CONSUMING segments in table config but it's " + + "overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + instanceAssignmentConfigMap.put("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + instanceAssignmentConfigMap.put("COMPLETED", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DEFAULT); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is not enabled for either or both COMPLETED and CONSUMING segments, but instance " + + "assignment is allowed for both", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - numReplicaGroups: " + replicaGroupPartitionConfig.getNumReplicaGroups() + + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Use MinimizeDataMovementOptions.DISABLE and enable it in table config for CONSUMING ONLY + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(), getServerTenant(), + new TagOverrideConfig(null, TagNameUtils.getRealtimeTagForTenant(getServerTenant()))); + tableConfig.setTenantConfig(tenantConfig); + updateTableConfig(tableConfig); + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.DISABLE); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden with disabled", + RebalancePreCheckerResult.PreCheckStatus.WARN, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + updateTableConfig(originalTableConfig); + } + + @Test + public void testRealtimeRebalancePreChecks() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getRealtimeTableConfig(); + TableConfig originalTableConfig = new TableConfig(tableConfig); + + // Ensure pre-check status is null if not enabled + String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + + // Enable pre-checks, nothing is set + rebalanceConfig.setPreChecks(true); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "includeConsuming is disabled for a realtime table.", + RebalancePreCheckerResult.PreCheckStatus.WARN, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Enable minimizeDataMovement, enable replica group only for COMPLETED segments + Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + rebalanceConfig.setIncludeConsuming(true); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "No need to reload", RebalancePreCheckerResult.PreCheckStatus.PASS, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Undo minimizeDataMovement, update the table config to add a column to bloom filter + rebalanceConfig.setMinimizeDataMovement(RebalanceConfig.MinimizeDataMovementOptions.ENABLE); + tableConfig.getIndexingConfig().getBloomFilterColumns().add("Quarter"); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Undo tableConfig change + tableConfig.getIndexingConfig().getBloomFilterColumns().remove("Quarter"); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Validate that the status for reload is still PASS (i.e. even though an extra server is tagged which has no + // segments assigned for this table, we don't try to get needReload status from that extra server, otherwise + // ERROR status would be returned) + BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS); + createServerTenant(getServerTenant(), 0, 1); + rebalanceConfig.setReassignInstances(true); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "No need to reload", + RebalancePreCheckerResult.PreCheckStatus.PASS, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + rebalanceConfig.setReassignInstances(false); + + // Stop the added server + serverStarter0.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter0.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + + // Add a schema change. Notice that this may affect other following tests + Schema schema = getSchema(getTableName()); + schema.addField(new MetricFieldSpec("NewAddedIntMetricB", FieldSpec.DataType.INT, 1)); + updateSchema(schema); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Keep schema change and update table config to add minimizeDataMovement + instanceAssignmentConfigMap = + Collections.singletonMap("COMPLETED", createInstanceAssignmentConfig(true, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("COMPLETED").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", RebalancePreCheckerResult.PreCheckStatus.PASS, + "Reload needed prior to running rebalance", RebalancePreCheckerResult.PreCheckStatus.WARN, + "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()) + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Keep schema change and update table config to add instance config map with minimizeDataMovement = false + instanceAssignmentConfigMap = + Collections.singletonMap("CONSUMING", createInstanceAssignmentConfig(false, TableType.REALTIME)); + replicaGroupPartitionConfig = instanceAssignmentConfigMap.get("CONSUMING").getReplicaGroupPartitionConfig(); + tableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.NO_OP, + "minimizeDataMovement is enabled", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, "All rebalance parameters look good", + RebalancePreCheckerResult.PreCheckStatus.PASS, + "reassignInstances is disabled, replica groups may not be updated.\nCOMPLETED segments - Replica Groups are " + + "not enabled, replication: " + tableConfig.getReplication() + "\nCONSUMING segments - numReplicaGroups: " + + replicaGroupPartitionConfig.getNumReplicaGroups() + ", numInstancesPerReplicaGroup: " + + (replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup() == 0 + ? "0 (using as many instances as possible)" : replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup()), + RebalancePreCheckerResult.PreCheckStatus.WARN); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + // Trigger rebalance config warning + BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1); + createServerTenant(getServerTenant(), 0, 1); + rebalanceConfig.setReassignInstances(true); + rebalanceConfig.setBestEfforts(true); + rebalanceConfig.setBootstrap(true); + rebalanceConfig.setMinAvailableReplicas(-1); + tableConfig.setInstanceAssignmentConfigMap(null); + updateTableConfig(tableConfig); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + checkRebalancePreCheckStatus(rebalanceResult, RebalanceResult.Status.DONE, + "Instance assignment not allowed, no need for minimizeDataMovement", + RebalancePreCheckerResult.PreCheckStatus.PASS, "Reload needed prior to running rebalance", + RebalancePreCheckerResult.PreCheckStatus.WARN, + "bestEfforts is enabled, only enable it if you know what you are doing\n" + + "bootstrap is enabled which can cause a large amount of data movement, double check if this is " + + "intended", RebalancePreCheckerResult.PreCheckStatus.WARN, + "COMPLETED segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication() + + "\nCONSUMING segments - Replica Groups are not enabled, replication: " + tableConfig.getReplication(), + RebalancePreCheckerResult.PreCheckStatus.PASS); + + // Disable dry-run + rebalanceConfig.setBootstrap(false); + rebalanceConfig.setBestEfforts(false); + rebalanceConfig.setDryRun(false); + response = sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME)); + rebalanceResult = JsonUtils.stringToObject(response, RebalanceResult.class); + assertNull(rebalanceResult.getPreChecksResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + + // Stop the added server + serverStarter1.stop(); + TestUtils.waitForCondition( + aVoid -> getHelixResourceManager().dropInstance(serverStarter1.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + updateTableConfig(originalTableConfig); + } + + private void checkRebalancePreCheckStatus(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, + String expectedMinimizeDataMovement, RebalancePreCheckerResult.PreCheckStatus expectedMinimizeDataMovementStatus, + String expectedNeedsReloadMessage, RebalancePreCheckerResult.PreCheckStatus expectedNeedsReloadStatus, + String expectedRebalanceConfig, RebalancePreCheckerResult.PreCheckStatus expectedRebalanceConfigStatus, + String expectedReplicaGroupMessage, RebalancePreCheckerResult.PreCheckStatus expectedReplicaGroupStatus) { + assertEquals(rebalanceResult.getStatus(), expectedStatus); + Map<String, RebalancePreCheckerResult> preChecksResult = rebalanceResult.getPreChecksResult(); + assertNotNull(preChecksResult); + assertEquals(preChecksResult.size(), 6); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)); + assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS)); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getPreCheckStatus(), + expectedMinimizeDataMovementStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT).getMessage(), + expectedMinimizeDataMovement); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getPreCheckStatus(), + expectedNeedsReloadStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS).getMessage(), + expectedNeedsReloadMessage); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(), + expectedRebalanceConfigStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(), + expectedRebalanceConfig); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(), + expectedReplicaGroupStatus); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(), + expectedReplicaGroupMessage); + // As the disk utilization check periodic task was disabled in the test controller (ControllerConf + // .RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY was set to 30000s, see org.apache.pinot.controller.helix + // .ControllerTest.getDefaultControllerConfiguration), server's disk util should be unavailable on all servers if + // not explicitly set via org.apache.pinot.controller.validation.ResourceUtilizationInfo.setDiskUsageInfo + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + assertEquals(preChecksResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(), + RebalancePreCheckerResult.PreCheckStatus.WARN); + } + + private InstanceAssignmentConfig createInstanceAssignmentConfig(boolean minimizeDataMovement, TableType tableType) { + InstanceTagPoolConfig instanceTagPoolConfig = + new InstanceTagPoolConfig(TagNameUtils.getServerTagForTenant(getServerTenant(), tableType), false, 1, null); + List<String> constraints = new ArrayList<>(); + constraints.add("constraints1"); + InstanceConstraintConfig instanceConstraintConfig = new InstanceConstraintConfig(constraints); + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 1, 1, + 1, 1, 1, minimizeDataMovement, + null); + return new InstanceAssignmentConfig(instanceTagPoolConfig, + instanceConstraintConfig, instanceReplicaGroupPartitionConfig, null, minimizeDataMovement); + } + + @Test + public void testOfflineRebalanceDryRunSummary() Review Comment: Both pre-check and summary tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org