somandal commented on code in PR #15891:
URL: https://github.com/apache/pinot/pull/15891#discussion_r2136666857


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +686,46 @@ public SuccessResponse deleteTenant(
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+      @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "Number of table rebalance jobs allowed to run at the 
same time", required = true, example =
+          "1")
+      @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are allowed in this tenant rebalance"
+              + " job. Leave blank to allow all tables from the tenant", 
example = "")
+      @QueryParam("allowTables") String allowTables,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are blocked in this tenant rebalance"
+              + " job. These table will be removed from allowTables", example 
= "")
+      @QueryParam("blockTables") String blockTables,
+      @ApiParam(value = "Show full rebalance results of each table in the 
response", example = "false")
+      @QueryParam("verboseResult") Boolean verboseResult,
+      @ApiParam(name = "rebalanceConfig", value = "The rebalance config 
applied to run every table", required = true)
+      TenantRebalanceConfig config) {
     // TODO decide on if the tenant rebalance should be database aware or not
     config.setTenantName(tenantName);
+    // Query params should override the config provided in the body, if present
+    if (degreeOfParallelism != null) {
+      config.setDegreeOfParallelism(degreeOfParallelism);
+    }
+    if (verboseResult != null) {
+      config.setVerboseResult(verboseResult);
+    }
+    if (allowTables != null) {
+      config.setAllowTables(Arrays.stream(StringUtil.split(allowTables, ',', 
0)).map(String::strip).collect(
+          Collectors.toSet()));
+    }
+    if (blockTables != null) {
+      config.setBlockTables(Arrays.stream(StringUtil.split(blockTables, ',', 
0)).map(String::strip).collect(
+          Collectors.toSet()));
+    }
+    if (!config.getParallelWhitelist().isEmpty() || 
!config.getParallelBlacklist().isEmpty()) {

Review Comment:
   if the user specifies both the old to be deprecated parameter and the new 
one, should we throw an exception? This way we can force the user intent to 
either use the old way or the new way?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +686,46 @@ public SuccessResponse deleteTenant(
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+      @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "Number of table rebalance jobs allowed to run at the 
same time", required = true, example =
+          "1")
+      @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are allowed in this tenant rebalance"

Review Comment:
   nit: Re-word to: "Comma separated list of tables with type that are allowed 
in this tenant rebalance"
   Maybe you can provide an example?
   
   Please update the other parameters as well, i.e. `blockTables`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +686,46 @@ public SuccessResponse deleteTenant(
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+      @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "Number of table rebalance jobs allowed to run at the 
same time", required = true, example =
+          "1")
+      @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are allowed in this tenant rebalance"
+              + " job. Leave blank to allow all tables from the tenant", 
example = "")
+      @QueryParam("allowTables") String allowTables,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are blocked in this tenant rebalance"
+              + " job. These table will be removed from allowTables", example 
= "")
+      @QueryParam("blockTables") String blockTables,

Review Comment:
   can we call this `excludeTables` instead?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java:
##########
@@ -27,18 +27,32 @@
 
 
 public class TenantRebalanceConfig extends RebalanceConfig {
+  // These fields are parameters for tenant rebalance. Hiding them in the 
swagger UI because we expect them to be set
+  // via query parameters. User can still set the fields in the POST body 
without errors, but it will be overridden by
+  // the values specified via query parameters.
   @JsonIgnore
+  @ApiModelProperty(hidden = true)
   private String _tenantName;
   @JsonProperty("degreeOfParallelism")
-  @ApiModelProperty(example = "1")
+  @ApiModelProperty(hidden = true)

Review Comment:
   do you need to specify `hidden = true` here and for the getters? can't this 
be specified in one place?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +686,46 @@ public SuccessResponse deleteTenant(
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+      @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "Number of table rebalance jobs allowed to run at the 
same time", required = true, example =
+          "1")
+      @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are allowed in this tenant rebalance"
+              + " job. Leave blank to allow all tables from the tenant", 
example = "")
+      @QueryParam("allowTables") String allowTables,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are blocked in this tenant rebalance"
+              + " job. These table will be removed from allowTables", example 
= "")
+      @QueryParam("blockTables") String blockTables,
+      @ApiParam(value = "Show full rebalance results of each table in the 
response", example = "false")
+      @QueryParam("verboseResult") Boolean verboseResult,
+      @ApiParam(name = "rebalanceConfig", value = "The rebalance config 
applied to run every table", required = true)
+      TenantRebalanceConfig config) {
     // TODO decide on if the tenant rebalance should be database aware or not
     config.setTenantName(tenantName);
+    // Query params should override the config provided in the body, if present
+    if (degreeOfParallelism != null) {
+      config.setDegreeOfParallelism(degreeOfParallelism);
+    }
+    if (verboseResult != null) {
+      config.setVerboseResult(verboseResult);
+    }
+    if (allowTables != null) {
+      config.setAllowTables(Arrays.stream(StringUtil.split(allowTables, ',', 
0)).map(String::strip).collect(
+          Collectors.toSet()));
+    }
+    if (blockTables != null) {
+      config.setBlockTables(Arrays.stream(StringUtil.split(blockTables, ',', 
0)).map(String::strip).collect(
+          Collectors.toSet()));
+    }
+    if (!config.getParallelWhitelist().isEmpty() || 
!config.getParallelBlacklist().isEmpty()) {
+      // If the parallel whitelist or blacklist is set, the old tenant 
rebalance logic will be used
+      // TODO: Deprecate the support for this in the future
+      LOGGER.warn("Using the old tenant rebalance logic because parallel 
whitelist or blacklist is set. "
+          + "This will be deprecated in the future.");
+      return ((DefaultTenantRebalancer) 
_tenantRebalancer).rebalanceWithParallelAndSequential(config);

Review Comment:
   This will cause issues if the `_tenantRebalancer` is overridden with a new 
class that doesn't implement this function, right?
   
   Should the `rebalance()` API within `_tenantRebalancer` handle this scenario 
internally so that we don't need to typecast here and we can keep the interface 
clean?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java:
##########
@@ -49,6 +56,80 @@ public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManag
 
   @Override
   public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
+    Map<String, RebalanceResult> dryRunResults = new HashMap<>();
+    Set<String> tables = getTenantTables(config.getTenantName());
+    Set<String> allowTables = config.getAllowTables();
+    if (!allowTables.isEmpty()) {
+      tables.retainAll(allowTables);
+    }
+    tables.removeAll(config.getBlockTables());
+    tables.forEach(table -> {
+      try {
+        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+        rebalanceConfig.setDryRun(true);
+        dryRunResults.put(table,
+            _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, 
createUniqueRebalanceJobIdentifier(),
+                false));
+      } catch (TableNotFoundException exception) {
+        dryRunResults.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null, null, null));
+      }
+    });
+    if (config.isDryRun()) {
+      return new TenantRebalanceResult(null, dryRunResults, 
config.isVerboseResult());
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
+    ConcurrentLinkedQueue<String> parallelQueue = createTableQueue(config, 
dryRunResults);
+    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
+    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+    try {
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          while (true) {
+            String table = parallelQueue.poll();
+            if (table == null) {
+              break;
+            }
+            RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+            rebalanceConfig.setDryRun(false);
+            if (dryRunResults.get(table)
+                .getRebalanceSummaryResult()
+                .getSegmentInfo()
+                .getReplicationFactor()
+                .getExpectedValueAfterRebalance() == 1) {
+              rebalanceConfig.setMinAvailableReplicas(0);
+            }
+            rebalanceTable(table, rebalanceConfig, 
dryRunResults.get(table).getJobId(), observer);
+          }
+          observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", config.getTenantName()));
+        });
+      }
+    } catch (Exception exception) {
+      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", config.getTenantName(),
+          exception.getMessage()));
+    }
+
+    // Prepare tenant rebalance result to return
+    Map<String, RebalanceResult> rebalanceResults = new HashMap<>();
+    for (String table : dryRunResults.keySet()) {
+      RebalanceResult result = dryRunResults.get(table);
+      if (result.getStatus() == RebalanceResult.Status.DONE) {
+        rebalanceResults.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
+            "In progress, check controller task status for the", 
result.getInstanceAssignment(),
+            result.getTierInstanceAssignment(), result.getSegmentAssignment(), 
result.getPreChecksResult(),
+            result.getRebalanceSummaryResult()));
+      } else {
+        rebalanceResults.put(table, dryRunResults.get(table));
+      }
+    }
+    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, 
config.isVerboseResult());
+  }
+
+  public TenantRebalanceResult 
rebalanceWithParallelAndSequential(TenantRebalanceConfig config) {

Review Comment:
   Let's add comments about this - explain this is the older way to be 
deprecated etc



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -32,9 +32,9 @@ public class RebalanceConfig {
   public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L; 
// 1 second
   public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 
3600000L; // 1 hour
 
-  // Whether to rebalance table in dry-run mode
+  // Whether to rebalance table in dry-run mode.
   @JsonProperty("dryRun")
-  @ApiModelProperty(example = "false")
+  @ApiModelProperty(example = "true")

Review Comment:
   why are these changes needed?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java:
##########
@@ -27,18 +27,32 @@
 
 
 public class TenantRebalanceConfig extends RebalanceConfig {
+  // These fields are parameters for tenant rebalance. Hiding them in the 
swagger UI because we expect them to be set
+  // via query parameters. User can still set the fields in the POST body 
without errors, but it will be overridden by
+  // the values specified via query parameters.
   @JsonIgnore
+  @ApiModelProperty(hidden = true)
   private String _tenantName;
   @JsonProperty("degreeOfParallelism")
-  @ApiModelProperty(example = "1")
+  @ApiModelProperty(hidden = true)
   private int _degreeOfParallelism = 1;
-  @JsonProperty("parallelWhitelist")
-  private Set<String> _parallelWhitelist = new HashSet<>();
   @JsonProperty("parallelBlacklist")
+  @ApiModelProperty(hidden = true)
   private Set<String> _parallelBlacklist = new HashSet<>();
+  @JsonProperty("parallelWhitelist")
+  @ApiModelProperty(hidden = true)
+  private Set<String> _parallelWhitelist = new HashSet<>();
+  // If empty, default to allow all tables
+  @JsonProperty("allowTables")
+  @ApiModelProperty(hidden = true)
+  private Set<String> _allowTables = new HashSet<>();

Review Comment:
   let's rename this to `includeTables`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java:
##########
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
+import java.util.Set;
+
+
 public interface TenantRebalancer {
   TenantRebalanceResult rebalance(TenantRebalanceConfig config);
+  Set<String> getTenantTables(String tenantName);

Review Comment:
   +1 lets not add an interface method just for testing purposes. better to 
test `DefaultTenantRebalancer` IMO



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java:
##########
@@ -27,18 +27,32 @@
 
 
 public class TenantRebalanceConfig extends RebalanceConfig {
+  // These fields are parameters for tenant rebalance. Hiding them in the 
swagger UI because we expect them to be set
+  // via query parameters. User can still set the fields in the POST body 
without errors, but it will be overridden by
+  // the values specified via query parameters.
   @JsonIgnore
+  @ApiModelProperty(hidden = true)
   private String _tenantName;
   @JsonProperty("degreeOfParallelism")
-  @ApiModelProperty(example = "1")
+  @ApiModelProperty(hidden = true)
   private int _degreeOfParallelism = 1;
-  @JsonProperty("parallelWhitelist")
-  private Set<String> _parallelWhitelist = new HashSet<>();
   @JsonProperty("parallelBlacklist")
+  @ApiModelProperty(hidden = true)
   private Set<String> _parallelBlacklist = new HashSet<>();
+  @JsonProperty("parallelWhitelist")
+  @ApiModelProperty(hidden = true)
+  private Set<String> _parallelWhitelist = new HashSet<>();
+  // If empty, default to allow all tables
+  @JsonProperty("allowTables")
+  @ApiModelProperty(hidden = true)
+  private Set<String> _allowTables = new HashSet<>();
+  @JsonProperty("blockTables")
+  @ApiModelProperty(hidden = true)
+  private Set<String> _blockTables = new HashSet<>();

Review Comment:
   let's rename this to `excludeTables`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -690,9 +686,46 @@ public SuccessResponse deleteTenant(
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+      @PathParam("tenantName") String tenantName,
+      @ApiParam(value = "Number of table rebalance jobs allowed to run at the 
same time", required = true, example =
+          "1")
+      @QueryParam("degreeOfParallelism") Integer degreeOfParallelism,
+      @ApiParam(value =
+          "Comma separated list of tables (with OFFLINE or REALTIME suffix) 
that are allowed in this tenant rebalance"
+              + " job. Leave blank to allow all tables from the tenant", 
example = "")
+      @QueryParam("allowTables") String allowTables,

Review Comment:
   can we call this `includeTables` instead? feels like it makes the intent 
clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to