somandal commented on code in PR #15891: URL: https://github.com/apache/pinot/pull/15891#discussion_r2141249400
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +685,52 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables with type that should be included in this tenant rebalance" + + " job. Leaving blank defaults to include all tables from the tenant. Example: table1_REALTIME, " + + "table2_REALTIME", + example = "") + @QueryParam("includeTables") String allowTables, + @ApiParam(value = + "Comma separated list of tables with type that would be excluded in this tenant rebalance" + + " job. These tables will be removed from includeTables (that said, if a table appears in both list, " + + "it will be excluded). Example: table1_REALTIME, table2_REALTIME", + example = "") + @QueryParam("excludeTables") String blockTables, + @ApiParam(value = "Show full rebalance results of each table in the response", example = "false") + @QueryParam("verboseResult") Boolean verboseResult, + @ApiParam(name = "rebalanceConfig", value = "The rebalance config applied to run every table", required = true) + TenantRebalanceConfig config) { // TODO decide on if the tenant rebalance should be database aware or not config.setTenantName(tenantName); + // Query params should override the config provided in the body, if present + if (degreeOfParallelism != null) { + config.setDegreeOfParallelism(degreeOfParallelism); + } + if (verboseResult != null) { + config.setVerboseResult(verboseResult); + } + if (allowTables != null) { + config.setIncludeTables(Arrays.stream(StringUtil.split(allowTables, ',', 0)) + .map(s -> s.strip().replaceAll("^\"|\"$", "")) + .collect(Collectors.toSet())); + } + if (blockTables != null) { + config.setExcludeTables(Arrays.stream(StringUtil.split(blockTables, ',', 0)) + .map(s -> s.strip().replaceAll("^\"|\"$", "")) + .collect(Collectors.toSet())); + } + if ((!config.getExcludeTables().isEmpty() || !config.getIncludeTables().isEmpty()) && ( + !config.getParallelBlacklist().isEmpty() || !config.getParallelWhitelist().isEmpty())) { + throw new ControllerApplicationException(LOGGER, + "Bad usage by specifying both include/excludeTables and parallelWhitelist/Blacklist at the same time." + + " The latter is a deprecated usage of this API.", Review Comment: Might be cleaner to have 2 booleans and check if only one of them is true: ``` boolean isParallelListSet = !config.getParallelBlacklist().isEmpty() || !config.getParallelWhitelist().isEmpty(); boolean isIncludeExcludeListSet = !config.getExcludeTables().isEmpty() || !config.getIncludeTables().isEmpty(); if (isParallelListSet && isIncludeExcludeListSet) { ... } ``` until it is actually deprecated, can we not add this comment? or you should mark it as deprecated in this PR itself? Instead perhaps say something like the config combinations are not supported? Either use one or the other ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +685,52 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables with type that should be included in this tenant rebalance" + + " job. Leaving blank defaults to include all tables from the tenant. Example: table1_REALTIME, " + + "table2_REALTIME", + example = "") + @QueryParam("includeTables") String allowTables, + @ApiParam(value = + "Comma separated list of tables with type that would be excluded in this tenant rebalance" + + " job. These tables will be removed from includeTables (that said, if a table appears in both list, " + + "it will be excluded). Example: table1_REALTIME, table2_REALTIME", + example = "") + @QueryParam("excludeTables") String blockTables, + @ApiParam(value = "Show full rebalance results of each table in the response", example = "false") + @QueryParam("verboseResult") Boolean verboseResult, + @ApiParam(name = "rebalanceConfig", value = "The rebalance config applied to run every table", required = true) + TenantRebalanceConfig config) { // TODO decide on if the tenant rebalance should be database aware or not config.setTenantName(tenantName); + // Query params should override the config provided in the body, if present + if (degreeOfParallelism != null) { + config.setDegreeOfParallelism(degreeOfParallelism); + } + if (verboseResult != null) { + config.setVerboseResult(verboseResult); + } + if (allowTables != null) { + config.setIncludeTables(Arrays.stream(StringUtil.split(allowTables, ',', 0)) + .map(s -> s.strip().replaceAll("^\"|\"$", "")) + .collect(Collectors.toSet())); + } + if (blockTables != null) { + config.setExcludeTables(Arrays.stream(StringUtil.split(blockTables, ',', 0)) + .map(s -> s.strip().replaceAll("^\"|\"$", "")) + .collect(Collectors.toSet())); + } + if ((!config.getExcludeTables().isEmpty() || !config.getIncludeTables().isEmpty()) && ( + !config.getParallelBlacklist().isEmpty() || !config.getParallelWhitelist().isEmpty())) { + throw new ControllerApplicationException(LOGGER, + "Bad usage by specifying both include/excludeTables and parallelWhitelist/Blacklist at the same time." + + " The latter is a deprecated usage of this API.", + Response.Status.BAD_REQUEST); Review Comment: can you add an API responses similar to other APIs if it makes sense? e.g from above: ``` @ApiResponses(value = { @ApiResponse(code = 200, message = "Success", response = InstancePartitions.class), @ApiResponse(code = 400, message = "Failed to deserialize/validate the instance partitions"), @ApiResponse(code = 500, message = "Error updating the tenant") } ``` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TenantRebalanceIntegrationTest.java: ########## @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.nio.charset.StandardCharsets; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.StringUtil; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +public class TenantRebalanceIntegrationTest extends BaseHybridClusterIntegrationTest { + + private String getRebalanceUrl() { + return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(), "tenants", getServerTenant(), + "rebalance"); + } + + @Test + public void testDeprecatedParallelWhitelistBlacklistCompatibility() Review Comment: nit: let's remove the word deprecated from the PR until you deprecate things to avoid confusion ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java: ########## @@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager, @Override public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { + if (!config.getParallelWhitelist().isEmpty() || !config.getParallelBlacklist().isEmpty()) { + // If the parallel whitelist or blacklist is set, the old tenant rebalance logic will be used + // TODO: Deprecate the support for this in the future + LOGGER.warn("Using the old tenant rebalance logic because parallel whitelist or blacklist is set, " + + "which is a deprecated usage of this API."); Review Comment: nit: again let's call it as incompatible with new configs rather than deprecated. you can comment about deprecation once you actually deprecate it ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java: ########## @@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager, @Override public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { + if (!config.getParallelWhitelist().isEmpty() || !config.getParallelBlacklist().isEmpty()) { + // If the parallel whitelist or blacklist is set, the old tenant rebalance logic will be used + // TODO: Deprecate the support for this in the future + LOGGER.warn("Using the old tenant rebalance logic because parallel whitelist or blacklist is set, " + + "which is a deprecated usage of this API."); + return rebalanceWithParallelAndSequential(config); + } + return rebalanceWithIncludeExcludeTables(config); + } + + private TenantRebalanceResult rebalanceWithIncludeExcludeTables(TenantRebalanceConfig config) { + Map<String, RebalanceResult> dryRunResults = new HashMap<>(); + Set<String> tables = getTenantTables(config.getTenantName()); + Set<String> includeTables = config.getIncludeTables(); + if (!includeTables.isEmpty()) { + tables.retainAll(includeTables); + } + tables.removeAll(config.getExcludeTables()); + tables.forEach(table -> { + try { + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(true); + dryRunResults.put(table, + _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), false)); + } catch (TableNotFoundException | RebalanceInProgressException exception) { + dryRunResults.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), + null, null, null, null, null)); + } + }); + + if (config.isDryRun()) { + return new TenantRebalanceResult(null, dryRunResults, config.isVerboseResult()); + } + + String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier(); + TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(), + tables, _pinotHelixResourceManager); + observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); + ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, dryRunResults); + // ensure atleast 1 thread is created to run the sequential table rebalance operations + int parallelism = Math.max(config.getDegreeOfParallelism(), 1); + try { + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + while (true) { + String table = parallelQueue.poll(); + if (table == null) { + break; + } + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(false); + if (dryRunResults.get(table) + .getRebalanceSummaryResult() + .getSegmentInfo() + .getReplicationFactor() + .getExpectedValueAfterRebalance() == 1) { + rebalanceConfig.setMinAvailableReplicas(0); Review Comment: Can you add a TODO here to address the concern about someone setting `downtime=true` and the need to wait for that? That way we don't lose track of this in the code ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java: ########## @@ -28,6 +30,14 @@ public class TenantRebalanceResult { private String _jobId; private Map<String, RebalanceResult> _rebalanceTableResults; + @JsonCreator + public TenantRebalanceResult( + @JsonProperty("jobId") String jobId, + @JsonProperty("rebalanceTableResults") Map<String, RebalanceResult> rebalanceTableResults) { + _jobId = jobId; + _rebalanceTableResults = rebalanceTableResults; Review Comment: can you have this call the other constructor instead of setting these? ``` this(jobId, rebalanceTableResults, true); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java: ########## @@ -54,20 +61,107 @@ public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager, @Override public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { + if (!config.getParallelWhitelist().isEmpty() || !config.getParallelBlacklist().isEmpty()) { + // If the parallel whitelist or blacklist is set, the old tenant rebalance logic will be used + // TODO: Deprecate the support for this in the future + LOGGER.warn("Using the old tenant rebalance logic because parallel whitelist or blacklist is set, " + + "which is a deprecated usage of this API."); + return rebalanceWithParallelAndSequential(config); + } + return rebalanceWithIncludeExcludeTables(config); + } + + private TenantRebalanceResult rebalanceWithIncludeExcludeTables(TenantRebalanceConfig config) { + Map<String, RebalanceResult> dryRunResults = new HashMap<>(); + Set<String> tables = getTenantTables(config.getTenantName()); + Set<String> includeTables = config.getIncludeTables(); + if (!includeTables.isEmpty()) { + tables.retainAll(includeTables); + } + tables.removeAll(config.getExcludeTables()); + tables.forEach(table -> { + try { + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(true); + dryRunResults.put(table, + _tableRebalanceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), false)); + } catch (TableNotFoundException | RebalanceInProgressException exception) { + dryRunResults.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), + null, null, null, null, null)); + } + }); + + if (config.isDryRun()) { + return new TenantRebalanceResult(null, dryRunResults, config.isVerboseResult()); + } + + String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier(); + TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(), + tables, _pinotHelixResourceManager); + observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); + ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, dryRunResults); + // ensure atleast 1 thread is created to run the sequential table rebalance operations + int parallelism = Math.max(config.getDegreeOfParallelism(), 1); + try { + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + while (true) { + String table = parallelQueue.poll(); + if (table == null) { + break; + } + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(false); + if (dryRunResults.get(table) + .getRebalanceSummaryResult() + .getSegmentInfo() + .getReplicationFactor() + .getExpectedValueAfterRebalance() == 1) { + rebalanceConfig.setMinAvailableReplicas(0); + } + rebalanceTable(table, rebalanceConfig, dryRunResults.get(table).getJobId(), observer); + } + observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName())); + }); + } + } catch (Exception exception) { + observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", config.getTenantName(), + exception.getMessage())); + } + + // Prepare tenant rebalance result to return + Map<String, RebalanceResult> rebalanceResults = new HashMap<>(); + for (String table : dryRunResults.keySet()) { + RebalanceResult result = dryRunResults.get(table); + if (result.getStatus() == RebalanceResult.Status.DONE) { + rebalanceResults.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, + "In progress, check controller task status for the", result.getInstanceAssignment(), + result.getTierInstanceAssignment(), result.getSegmentAssignment(), result.getPreChecksResult(), + result.getRebalanceSummaryResult())); + } else { + rebalanceResults.put(table, dryRunResults.get(table)); + } + } + return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, config.isVerboseResult()); + } + + // This method implements the old logic for tenant rebalance using parallel whitelist/blacklist. + // Usage of this method is now deprecated and will be removed in the future. Review Comment: nit: let's just wait to add this part of the comment when you actually deprecate it. otherwise it gets confusing to follow since it isn't actually marked as deprecated yet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org