somandal commented on code in PR #15891: URL: https://github.com/apache/pinot/pull/15891#discussion_r2136666857
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +686,46 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are allowed in this tenant rebalance" + + " job. Leave blank to allow all tables from the tenant", example = "") + @QueryParam("allowTables") String allowTables, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are blocked in this tenant rebalance" + + " job. These table will be removed from allowTables", example = "") + @QueryParam("blockTables") String blockTables, + @ApiParam(value = "Show full rebalance results of each table in the response", example = "false") + @QueryParam("verboseResult") Boolean verboseResult, + @ApiParam(name = "rebalanceConfig", value = "The rebalance config applied to run every table", required = true) + TenantRebalanceConfig config) { // TODO decide on if the tenant rebalance should be database aware or not config.setTenantName(tenantName); + // Query params should override the config provided in the body, if present + if (degreeOfParallelism != null) { + config.setDegreeOfParallelism(degreeOfParallelism); + } + if (verboseResult != null) { + config.setVerboseResult(verboseResult); + } + if (allowTables != null) { + config.setAllowTables(Arrays.stream(StringUtil.split(allowTables, ',', 0)).map(String::strip).collect( + Collectors.toSet())); + } + if (blockTables != null) { + config.setBlockTables(Arrays.stream(StringUtil.split(blockTables, ',', 0)).map(String::strip).collect( + Collectors.toSet())); + } + if (!config.getParallelWhitelist().isEmpty() || !config.getParallelBlacklist().isEmpty()) { Review Comment: if the user specifies both the old to be deprecated parameter and the new one, should we throw an exception? This way we can force the user intent to either use the old way or the new way? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +686,46 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are allowed in this tenant rebalance" Review Comment: nit: Re-word to: "Comma separated list of tables with type that are allowed in this tenant rebalance" Maybe you can provide an example? Please update the other parameters as well, i.e. `blockTables` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +686,46 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are allowed in this tenant rebalance" + + " job. Leave blank to allow all tables from the tenant", example = "") + @QueryParam("allowTables") String allowTables, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are blocked in this tenant rebalance" + + " job. These table will be removed from allowTables", example = "") + @QueryParam("blockTables") String blockTables, Review Comment: can we call this `excludeTables` instead? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java: ########## @@ -27,18 +27,32 @@ public class TenantRebalanceConfig extends RebalanceConfig { + // These fields are parameters for tenant rebalance. Hiding them in the swagger UI because we expect them to be set + // via query parameters. User can still set the fields in the POST body without errors, but it will be overridden by + // the values specified via query parameters. @JsonIgnore + @ApiModelProperty(hidden = true) private String _tenantName; @JsonProperty("degreeOfParallelism") - @ApiModelProperty(example = "1") + @ApiModelProperty(hidden = true) Review Comment: do you need to specify `hidden = true` here and for the getters? can't this be specified in one place? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +686,46 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are allowed in this tenant rebalance" + + " job. Leave blank to allow all tables from the tenant", example = "") + @QueryParam("allowTables") String allowTables, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are blocked in this tenant rebalance" + + " job. These table will be removed from allowTables", example = "") + @QueryParam("blockTables") String blockTables, + @ApiParam(value = "Show full rebalance results of each table in the response", example = "false") + @QueryParam("verboseResult") Boolean verboseResult, + @ApiParam(name = "rebalanceConfig", value = "The rebalance config applied to run every table", required = true) + TenantRebalanceConfig config) { // TODO decide on if the tenant rebalance should be database aware or not config.setTenantName(tenantName); + // Query params should override the config provided in the body, if present + if (degreeOfParallelism != null) { + config.setDegreeOfParallelism(degreeOfParallelism); + } + if (verboseResult != null) { + config.setVerboseResult(verboseResult); + } + if (allowTables != null) { + config.setAllowTables(Arrays.stream(StringUtil.split(allowTables, ',', 0)).map(String::strip).collect( + Collectors.toSet())); + } + if (blockTables != null) { + config.setBlockTables(Arrays.stream(StringUtil.split(blockTables, ',', 0)).map(String::strip).collect( + Collectors.toSet())); + } + if (!config.getParallelWhitelist().isEmpty() || !config.getParallelBlacklist().isEmpty()) { + // If the parallel whitelist or blacklist is set, the old tenant rebalance logic will be used + // TODO: Deprecate the support for this in the future + LOGGER.warn("Using the old tenant rebalance logic because parallel whitelist or blacklist is set. " + + "This will be deprecated in the future."); + return ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithParallelAndSequential(config); Review Comment: This will cause issues if the `_tenantRebalancer` is overridden with a new class that doesn't implement this function, right? Should the `rebalance()` API within `_tenantRebalancer` handle this scenario internally so that we don't need to typecast here and we can keep the interface clean? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java: ########## @@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManag @Override public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { + Map<String, RebalanceResult> dryRunResults = new HashMap<>(); + Set<String> tables = getTenantTables(config.getTenantName()); + Set<String> allowTables = config.getAllowTables(); + if (!allowTables.isEmpty()) { + tables.retainAll(allowTables); + } + tables.removeAll(config.getBlockTables()); + tables.forEach(table -> { + try { + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(true); + dryRunResults.put(table, + _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, createUniqueRebalanceJobIdentifier(), + false)); + } catch (TableNotFoundException exception) { + dryRunResults.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), + null, null, null, null, null)); + } + }); + if (config.isDryRun()) { + return new TenantRebalanceResult(null, dryRunResults, config.isVerboseResult()); + } + + String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier(); + TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(), + tables, _pinotHelixResourceManager); + observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); + ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, dryRunResults); + // ensure atleast 1 thread is created to run the sequential table rebalance operations + int parallelism = Math.max(config.getDegreeOfParallelism(), 1); + try { + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + while (true) { + String table = parallelQueue.poll(); + if (table == null) { + break; + } + RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); + rebalanceConfig.setDryRun(false); + if (dryRunResults.get(table) + .getRebalanceSummaryResult() + .getSegmentInfo() + .getReplicationFactor() + .getExpectedValueAfterRebalance() == 1) { + rebalanceConfig.setMinAvailableReplicas(0); + } + rebalanceTable(table, rebalanceConfig, dryRunResults.get(table).getJobId(), observer); + } + observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName())); + }); + } + } catch (Exception exception) { + observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", config.getTenantName(), + exception.getMessage())); + } + + // Prepare tenant rebalance result to return + Map<String, RebalanceResult> rebalanceResults = new HashMap<>(); + for (String table : dryRunResults.keySet()) { + RebalanceResult result = dryRunResults.get(table); + if (result.getStatus() == RebalanceResult.Status.DONE) { + rebalanceResults.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, + "In progress, check controller task status for the", result.getInstanceAssignment(), + result.getTierInstanceAssignment(), result.getSegmentAssignment(), result.getPreChecksResult(), + result.getRebalanceSummaryResult())); + } else { + rebalanceResults.put(table, dryRunResults.get(table)); + } + } + return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, config.isVerboseResult()); + } + + public TenantRebalanceResult rebalanceWithParallelAndSequential(TenantRebalanceConfig config) { Review Comment: Let's add comments about this - explain this is the older way to be deprecated etc ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java: ########## @@ -32,9 +32,9 @@ public class RebalanceConfig { public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L; // 1 second public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 3600000L; // 1 hour - // Whether to rebalance table in dry-run mode + // Whether to rebalance table in dry-run mode. @JsonProperty("dryRun") - @ApiModelProperty(example = "false") + @ApiModelProperty(example = "true") Review Comment: why are these changes needed? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java: ########## @@ -27,18 +27,32 @@ public class TenantRebalanceConfig extends RebalanceConfig { + // These fields are parameters for tenant rebalance. Hiding them in the swagger UI because we expect them to be set + // via query parameters. User can still set the fields in the POST body without errors, but it will be overridden by + // the values specified via query parameters. @JsonIgnore + @ApiModelProperty(hidden = true) private String _tenantName; @JsonProperty("degreeOfParallelism") - @ApiModelProperty(example = "1") + @ApiModelProperty(hidden = true) private int _degreeOfParallelism = 1; - @JsonProperty("parallelWhitelist") - private Set<String> _parallelWhitelist = new HashSet<>(); @JsonProperty("parallelBlacklist") + @ApiModelProperty(hidden = true) private Set<String> _parallelBlacklist = new HashSet<>(); + @JsonProperty("parallelWhitelist") + @ApiModelProperty(hidden = true) + private Set<String> _parallelWhitelist = new HashSet<>(); + // If empty, default to allow all tables + @JsonProperty("allowTables") + @ApiModelProperty(hidden = true) + private Set<String> _allowTables = new HashSet<>(); Review Comment: let's rename this to `includeTables` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java: ########## @@ -18,6 +18,10 @@ */ package org.apache.pinot.controller.helix.core.rebalance.tenant; +import java.util.Set; + + public interface TenantRebalancer { TenantRebalanceResult rebalance(TenantRebalanceConfig config); + Set<String> getTenantTables(String tenantName); Review Comment: +1 lets not add an interface method just for testing purposes. better to test `DefaultTenantRebalancer` IMO ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java: ########## @@ -27,18 +27,32 @@ public class TenantRebalanceConfig extends RebalanceConfig { + // These fields are parameters for tenant rebalance. Hiding them in the swagger UI because we expect them to be set + // via query parameters. User can still set the fields in the POST body without errors, but it will be overridden by + // the values specified via query parameters. @JsonIgnore + @ApiModelProperty(hidden = true) private String _tenantName; @JsonProperty("degreeOfParallelism") - @ApiModelProperty(example = "1") + @ApiModelProperty(hidden = true) private int _degreeOfParallelism = 1; - @JsonProperty("parallelWhitelist") - private Set<String> _parallelWhitelist = new HashSet<>(); @JsonProperty("parallelBlacklist") + @ApiModelProperty(hidden = true) private Set<String> _parallelBlacklist = new HashSet<>(); + @JsonProperty("parallelWhitelist") + @ApiModelProperty(hidden = true) + private Set<String> _parallelWhitelist = new HashSet<>(); + // If empty, default to allow all tables + @JsonProperty("allowTables") + @ApiModelProperty(hidden = true) + private Set<String> _allowTables = new HashSet<>(); + @JsonProperty("blockTables") + @ApiModelProperty(hidden = true) + private Set<String> _blockTables = new HashSet<>(); Review Comment: let's rename this to `excludeTables` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java: ########## @@ -690,9 +686,46 @@ public SuccessResponse deleteTenant( @ApiOperation(value = "Rebalances all the tables that are part of the tenant") public TenantRebalanceResult rebalance( @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) - @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceConfig config) { + @PathParam("tenantName") String tenantName, + @ApiParam(value = "Number of table rebalance jobs allowed to run at the same time", required = true, example = + "1") + @QueryParam("degreeOfParallelism") Integer degreeOfParallelism, + @ApiParam(value = + "Comma separated list of tables (with OFFLINE or REALTIME suffix) that are allowed in this tenant rebalance" + + " job. Leave blank to allow all tables from the tenant", example = "") + @QueryParam("allowTables") String allowTables, Review Comment: can we call this `includeTables` instead? feels like it makes the intent clearer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org