This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6b1b4feba Tenant rebalance and status tracking APIs (#11128)
d6b1b4feba is described below

commit d6b1b4feba1f9a2168f5119e1b673d7cfcf8d146
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Sat Aug 12 02:23:10 2023 +0530

    Tenant rebalance and status tracking APIs (#11128)
    
    * endpoint to rebalance tables of a tenant
    
    * move tenant rebalance logic to a separate TenantRebalancer service
    
    * implement extractRebalanceConfig
    
    * function rename
    
    * observability
    
    * status reporting
    
    * moved tenant rebalance related classes to a separate package
    
    * ignore null values in response on RebalanceResult
    
    * bugfix, initiate the _remainingTables field
    
    * integration test for tenant rebalance
    
    * break while loop gracefully
    
    * reuse the last active parallel thread to run the sequential jobs
    
    * metadata npe handling
    
    * maintain separate executor service for tenant rebalance operations
    
    * prioritise dimension tables
    
    * var name fix
    
    * todo comment added
    
    * Add Authorize annotations to APIs
---
 .../metadata/controllerjob/ControllerJobType.java  |   2 +-
 .../pinot/controller/BaseControllerStarter.java    |  10 +
 .../api/resources/PinotTenantRestletResource.java  |  55 +++++
 .../TenantRebalanceJobStatusResponse.java          |  43 ++++
 .../helix/core/rebalance/RebalanceContext.java     | 138 ++++++++++++
 .../helix/core/rebalance/RebalanceResult.java      |   5 +
 .../rebalance/tenant/DefaultTenantRebalancer.java  | 234 +++++++++++++++++++++
 .../rebalance/tenant/TenantRebalanceContext.java   |  81 +++++++
 .../rebalance/tenant/TenantRebalanceObserver.java  |  21 +-
 .../tenant/TenantRebalanceProgressStats.java       | 114 ++++++++++
 .../rebalance/tenant/TenantRebalanceResult.java    |  59 ++++++
 .../core/rebalance/tenant/TenantRebalancer.java    |   7 +-
 .../tenant/ZkBasedTenantRebalanceObserver.java     | 116 ++++++++++
 .../rebalance/tenant/TenantRebalancerTest.java     | 191 +++++++++++++++++
 .../java/org/apache/pinot/core/auth/Actions.java   |   1 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 16 files changed, 1071 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
index aaad454787..025d7b6c39 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.common.metadata.controllerjob;
 
 public enum ControllerJobType {
-  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 594ca27bfd..610c0a01f0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -85,6 +85,8 @@ import 
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
 import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
 import 
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
@@ -173,6 +175,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected StaleInstancesCleanupTask _staleInstancesCleanupTask;
   protected TaskMetricsEmitter _taskMetricsEmitter;
   protected MultiThreadedHttpConnectionManager _connectionManager;
+  protected TenantRebalancer _tenantRebalancer;
+  protected ExecutorService _tenantRebalanceExecutorService;
 
   @Override
   public void init(PinotConfiguration pinotConfiguration)
@@ -223,6 +227,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // This executor service is used to do async tasks from multiget util or 
table rebalancing.
       _executorService =
           Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
+      _tenantRebalanceExecutorService =
+          Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("tenant-rebalance-thread-%d").build());
+      _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
     // Initialize the table config tuner registry.
@@ -484,6 +491,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
         bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
         
bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
+        bind(_tenantRebalancer).to(TenantRebalancer.class);
         String loggerRootDir = 
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
         if (loggerRootDir != null) {
           bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class);
@@ -781,6 +789,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       LOGGER.info("Shutting down executor service");
       _executorService.shutdownNow();
       _executorService.awaitTermination(10L, TimeUnit.SECONDS);
+      _tenantRebalanceExecutorService.shutdownNow();
+      _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 70a24e15e4..f160bba80d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -32,6 +32,7 @@ import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -49,6 +50,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.api.access.AccessType;
@@ -56,6 +58,10 @@ import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
@@ -64,6 +70,7 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,6 +113,9 @@ public class PinotTenantRestletResource {
   @Inject
   ControllerMetrics _controllerMetrics;
 
+  @Inject
+  TenantRebalancer _tenantRebalancer;
+
   @POST
   @Path("/tenants")
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.CREATE_TENANT)
@@ -565,4 +575,49 @@ public class PinotTenantRestletResource {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_TENANT_DELETE_ERROR,
 1L);
     throw new ControllerApplicationException(LOGGER, "Error deleting tenant", 
Response.Status.INTERNAL_SERVER_ERROR);
   }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.REBALANCE_TENANT_TABLES)
+  @Path("/tenants/{tenantName}/rebalance")
+  @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
+  public TenantRebalanceResult rebalance(
+      @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
+      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceContext context) {
+    context.setTenantName(tenantName);
+    return _tenantRebalancer.rebalance(context);
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.READ)
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_REBALANCE_STATUS)
+  @Path("/tenants/rebalanceStatus/{jobId}")
+  @ApiOperation(value = "Gets detailed stats of a tenant rebalance operation",
+      notes = "Gets detailed stats of a tenant rebalance operation")
+  public TenantRebalanceJobStatusResponse rebalanceStatus(
+      @ApiParam(value = "Tenant rebalance job id", required = true) 
@PathParam("jobId") String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TENANT_REBALANCE);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    TenantRebalanceProgressStats tenantRebalanceProgressStats =
+        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+            TenantRebalanceProgressStats.class);
+    long timeSinceStartInSecs = 
tenantRebalanceProgressStats.getTimeToFinishInSeconds();
+    if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) {
+      timeSinceStartInSecs =
+          (System.currentTimeMillis() - 
tenantRebalanceProgressStats.getStartTimeMs()) / 1000;
+    }
+
+    TenantRebalanceJobStatusResponse tenantRebalanceJobStatusResponse = new 
TenantRebalanceJobStatusResponse();
+    
tenantRebalanceJobStatusResponse.setTenantRebalanceProgressStats(tenantRebalanceProgressStats);
+    
tenantRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+    return tenantRebalanceJobStatusResponse;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
new file mode 100644
index 0000000000..db1bfda9e8
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+
+
+public class TenantRebalanceJobStatusResponse {
+  private long _timeElapsedSinceStartInSeconds;
+  private TenantRebalanceProgressStats _tenantRebalanceProgressStats;
+
+  public long getTimeElapsedSinceStartInSeconds() {
+    return _timeElapsedSinceStartInSeconds;
+  }
+
+  public void setTimeElapsedSinceStartInSeconds(long 
timeElapsedSinceStartInSeconds) {
+    _timeElapsedSinceStartInSeconds = timeElapsedSinceStartInSeconds;
+  }
+
+  public TenantRebalanceProgressStats getTenantRebalanceProgressStats() {
+    return _tenantRebalanceProgressStats;
+  }
+
+  public void setTenantRebalanceProgressStats(TenantRebalanceProgressStats 
tenantRebalanceProgressStats) {
+    _tenantRebalanceProgressStats = tenantRebalanceProgressStats;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
new file mode 100644
index 0000000000..cd6e06c399
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+@ApiModel
+public class RebalanceContext {
+  // TODO : simplify the rebalance configs wherever possible
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private Boolean _dryRun = false;
+  @JsonProperty("reassignInstances")
+  @ApiModelProperty(example = "false")
+  private Boolean _reassignInstances = false;
+  @JsonProperty("includeConsuming")
+  @ApiModelProperty(example = "false")
+  private Boolean _includeConsuming = false;
+  @JsonProperty("bootstrap")
+  @ApiModelProperty(example = "false")
+  private Boolean _bootstrap = false;
+  @JsonProperty("downtime")
+  @ApiModelProperty(example = "false")
+  private Boolean _downtime = false;
+  @JsonProperty("minAvailableReplicas")
+  @ApiModelProperty(example = "1")
+  private Integer _minAvailableReplicas = 1;
+  @JsonProperty("bestEfforts")
+  @ApiModelProperty(example = "false")
+  private Boolean _bestEfforts = false;
+  @JsonProperty("externalViewCheckIntervalInMs")
+  @ApiModelProperty(example = "1000")
+  private Long _externalViewCheckIntervalInMs = 1000L;
+  @JsonProperty("externalViewStabilizationTimeoutInMs")
+  @ApiModelProperty(example = "3600000")
+  private Long _externalViewStabilizationTimeoutInMs = 3600000L;
+  @JsonProperty("updateTargetTier")
+  @ApiModelProperty(example = "false")
+  private Boolean _updateTargetTier = false;
+
+  public Boolean isDryRun() {
+    return _dryRun;
+  }
+
+  public void setDryRun(Boolean dryRun) {
+    _dryRun = dryRun;
+  }
+
+  public Boolean isReassignInstances() {
+    return _reassignInstances;
+  }
+
+  public void setReassignInstances(Boolean reassignInstances) {
+    _reassignInstances = reassignInstances;
+  }
+
+  public Boolean isIncludeConsuming() {
+    return _includeConsuming;
+  }
+
+  public void setIncludeConsuming(Boolean includeConsuming) {
+    _includeConsuming = includeConsuming;
+  }
+
+  public Boolean isBootstrap() {
+    return _bootstrap;
+  }
+
+  public void setBootstrap(Boolean bootstrap) {
+    _bootstrap = bootstrap;
+  }
+
+  public Boolean isDowntime() {
+    return _downtime;
+  }
+
+  public void setDowntime(Boolean downtime) {
+    _downtime = downtime;
+  }
+
+  public Integer getMinAvailableReplicas() {
+    return _minAvailableReplicas;
+  }
+
+  public void setMinAvailableReplicas(Integer minAvailableReplicas) {
+    _minAvailableReplicas = minAvailableReplicas;
+  }
+
+  public Boolean isBestEfforts() {
+    return _bestEfforts;
+  }
+
+  public void setBestEfforts(Boolean bestEfforts) {
+    _bestEfforts = bestEfforts;
+  }
+
+  public Long getExternalViewCheckIntervalInMs() {
+    return _externalViewCheckIntervalInMs;
+  }
+
+  public void setExternalViewCheckIntervalInMs(Long 
externalViewCheckIntervalInMs) {
+    _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
+  }
+
+  public Long getExternalViewStabilizationTimeoutInMs() {
+    return _externalViewStabilizationTimeoutInMs;
+  }
+
+  public void setExternalViewStabilizationTimeoutInMs(Long 
externalViewStabilizationTimeoutInMs) {
+    _externalViewStabilizationTimeoutInMs = 
externalViewStabilizationTimeoutInMs;
+  }
+
+  public Boolean isUpdateTargetTier() {
+    return _updateTargetTier;
+  }
+
+  public void setUpdateTargetTier(Boolean updateTargetTier) {
+    _updateTargetTier = updateTargetTier;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index c76d06fcaf..ed3ad624d8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.rebalance;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -28,11 +29,15 @@ import 
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 
 
 @JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
 public class RebalanceResult {
   private final String _jobId;
   private final Status _status;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final Map<InstancePartitionsType, InstancePartitions> 
_instanceAssignment;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final Map<String, InstancePartitions> _tierInstanceAssignment;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final Map<String, Map<String, String>> _segmentAssignment;
   private final String _description;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
new file mode 100644
index 0000000000..d0278c15e3
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.google.common.collect.Sets;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultTenantRebalancer implements TenantRebalancer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  ExecutorService _executorService;
+
+  public DefaultTenantRebalancer(PinotHelixResourceManager 
pinotHelixResourceManager, ExecutorService executorService) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executorService = executorService;
+  }
+
+  @Override
+  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+    Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
+    Set<String> tables = getTenantTables(context.getTenantName());
+    tables.forEach(table -> {
+      try {
+        Configuration config = extractRebalanceConfig(context);
+        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceResult.put(table, 
_pinotHelixResourceManager.rebalanceTable(table, config, false));
+      } catch (TableNotFoundException exception) {
+        rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
+            null, null, null));
+      }
+    });
+    if (context.isDryRun() || context.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, 
context.isVerboseResult());
+    } else {
+      for (String table : rebalanceResult.keySet()) {
+        RebalanceResult result = rebalanceResult.get(table);
+        if (result.getStatus() == RebalanceResult.Status.DONE) {
+          rebalanceResult.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
+              "In progress, check controller task status for the", 
result.getInstanceAssignment(),
+              result.getTierInstanceAssignment(), 
result.getSegmentAssignment()));
+        }
+      }
+    }
+
+    String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
+    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+        tables, _pinotHelixResourceManager);
+    observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
+    final Deque<String> sequentialQueue = new LinkedList<>();
+    final Deque<String> parallelQueue = new ConcurrentLinkedDeque<>();
+    // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
+    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
+    Set<String> dimTables = getDimensionalTables(context.getTenantName());
+    AtomicInteger activeThreads = new AtomicInteger(parallelism);
+    try {
+      if (parallelism > 1) {
+        Set<String> parallelTables;
+        if (!context.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        } else {
+          parallelTables = new HashSet<>(tables);
+        }
+        if (!context.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, 
context.getParallelBlacklist());
+        }
+        parallelTables.forEach(table -> {
+          if (dimTables.contains(table)) {
+            // prioritise dimension tables
+            parallelQueue.addFirst(table);
+          } else {
+            parallelQueue.addLast(table);
+          }
+        });
+        Sets.difference(tables, parallelTables).forEach(table -> {
+          if (dimTables.contains(table)) {
+            // prioritise dimension tables
+            sequentialQueue.addFirst(table);
+          } else {
+            sequentialQueue.addLast(table);
+          }
+        });
+      } else {
+        tables.forEach(table -> {
+          if (dimTables.contains(table)) {
+            // prioritise dimension tables
+            sequentialQueue.addFirst(table);
+          } else {
+            sequentialQueue.addLast(table);
+          }
+        });
+      }
+
+      for (int i = 0; i < parallelism; i++) {
+        _executorService.submit(() -> {
+          while (true) {
+            String table = parallelQueue.pollFirst();
+            if (table == null) {
+              break;
+            }
+            Configuration config = extractRebalanceConfig(context);
+            config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+            config.setProperty(RebalanceConfigConstants.JOB_ID, 
rebalanceResult.get(table).getJobId());
+            rebalanceTable(table, config, observer);
+          }
+          // Last parallel thread to finish the table rebalance job will pick 
up the
+          // sequential table rebalance execution
+          if (activeThreads.decrementAndGet() == 0) {
+            Configuration config = extractRebalanceConfig(context);
+            config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+            while (true) {
+              String table = sequentialQueue.pollFirst();
+              if (table == null) {
+                break;
+              }
+              config.setProperty(RebalanceConfigConstants.JOB_ID, 
rebalanceResult.get(table).getJobId());
+              rebalanceTable(table, config, observer);
+            }
+            observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", context.getTenantName()));
+          }
+        });
+      }
+    } catch (Exception exception) {
+      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", context.getTenantName(),
+          exception.getMessage()));
+    }
+    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, 
context.isVerboseResult());
+  }
+
+  private Set<String> getDimensionalTables(String tenantName) {
+    Set<String> dimTables = new HashSet<>();
+    for (String table : _pinotHelixResourceManager.getAllTables()) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
+      if (tableConfig == null) {
+        LOGGER.error("Unable to retrieve table config for table: {}", table);
+        continue;
+      }
+      if (tenantName.equals(tableConfig.getTenantConfig().getServer()) && 
tableConfig.isDimTable()) {
+        dimTables.add(table);
+      }
+    }
+    return dimTables;
+  }
+
+  private Configuration extractRebalanceConfig(TenantRebalanceContext context) 
{
+    Configuration rebalanceConfig = new BaseConfiguration();
+    rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, 
context.isDryRun());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
context.isReassignInstances());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
context.isIncludeConsuming());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, 
context.isBootstrap());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, 
context.isDowntime());
+    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
+        context.getMinAvailableReplicas());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
context.isBestEfforts());
+    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
+        context.getExternalViewCheckIntervalInMs());
+    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
+        context.getExternalViewStabilizationTimeoutInMs());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, 
context.isUpdateTargetTier());
+    rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, 
createUniqueRebalanceJobIdentifier());
+    return rebalanceConfig;
+  }
+
+  private String createUniqueRebalanceJobIdentifier() {
+    return UUID.randomUUID().toString();
+  }
+
+  private Set<String> getTenantTables(String tenantName) {
+    Set<String> tables = new HashSet<>();
+    for (String table : _pinotHelixResourceManager.getAllTables()) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(table);
+      if (tableConfig == null) {
+        LOGGER.error("Unable to retrieve table config for table: {}", table);
+        continue;
+      }
+      String tableConfigTenant = tableConfig.getTenantConfig().getServer();
+      if (tenantName.equals(tableConfigTenant)) {
+        tables.add(table);
+      }
+    }
+    return tables;
+  }
+
+  private void rebalanceTable(String tableName, Configuration config,
+      TenantRebalanceObserver observer) {
+    try {
+      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName,
+          config.getString(RebalanceConfigConstants.JOB_ID));
+      RebalanceResult result = 
_pinotHelixResourceManager.rebalanceTable(tableName, config, true);
+      if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
+        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
+      } else {
+        
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
+            result.getDescription());
+      }
+    } catch (Throwable t) {
+      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, 
tableName,
+          String.format("Caught exception/error while rebalancing table: %s", 
tableName));
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
new file mode 100644
index 0000000000..5e76dcc014
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext;
+
+
+public class TenantRebalanceContext extends RebalanceContext {
+  @JsonIgnore
+  private String _tenantName;
+  @JsonProperty("degreeOfParallelism")
+  @ApiModelProperty(example = "1")
+  private Integer _degreeOfParallelism = 1;
+  @JsonProperty("parallelWhitelist")
+  private Set<String> _parallelWhitelist = new HashSet<>();
+  @JsonProperty("parallelBlacklist")
+  private Set<String> _parallelBlacklist = new HashSet<>();
+
+  private boolean _verboseResult = false;
+
+  public String getTenantName() {
+    return _tenantName;
+  }
+
+  public void setTenantName(String tenantName) {
+    _tenantName = tenantName;
+  }
+
+  public int getDegreeOfParallelism() {
+    return _degreeOfParallelism;
+  }
+
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    _degreeOfParallelism = degreeOfParallelism;
+  }
+
+  public Set<String> getParallelWhitelist() {
+    return _parallelWhitelist;
+  }
+
+  public void setParallelWhitelist(Set<String> parallelWhitelist) {
+    _parallelWhitelist = parallelWhitelist;
+  }
+
+  public Set<String> getParallelBlacklist() {
+    return _parallelBlacklist;
+  }
+
+  public void setParallelBlacklist(Set<String> parallelBlacklist) {
+    _parallelBlacklist = parallelBlacklist;
+  }
+
+  public boolean isVerboseResult() {
+    return _verboseResult;
+  }
+
+  public void setVerboseResult(boolean verboseResult) {
+    _verboseResult = verboseResult;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
similarity index 59%
copy from 
pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
copy to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
index aaad454787..82dd086362 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java
@@ -16,8 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metadata.controllerjob;
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
-public enum ControllerJobType {
-  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+public interface TenantRebalanceObserver {
+  enum Trigger {
+    // Start of tenant rebalance Trigger
+    START_TRIGGER,
+    // rebalance of a table is started
+    REBALANCE_STARTED_TRIGGER,
+    // rebalance of a table is completed
+    REBALANCE_COMPLETED_TRIGGER,
+    // rebalance of a table is failed
+    REBALANCE_ERRORED_TRIGGER
+  }
+
+  void onTrigger(TenantRebalanceObserver.Trigger trigger, String tableName, 
String description);
+
+  void onSuccess(String msg);
+
+  void onError(String errorMsg);
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
new file mode 100644
index 0000000000..fb77100ece
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+public class TenantRebalanceProgressStats {
+  // status map of tables and their respective rebalance status
+  private Map<String, String> _tableStatusMap;
+  private final Map<String, String> _tableRebalanceJobIdMap = new HashMap<>();
+  private int _totalTables;
+  private int _remainingTables;
+  // When did Rebalance start
+  private long _startTimeMs;
+  // How long did rebalance take
+  private long _timeToFinishInSeconds;
+  // Success/failure message
+  private String _completionStatusMsg;
+
+  public TenantRebalanceProgressStats() {
+  }
+
+  public TenantRebalanceProgressStats(Set<String> tables) {
+    _tableStatusMap = tables.stream()
+        .collect(Collectors.toMap(Function.identity(), k -> 
TableStatus.UNPROCESSED.name()));
+    _totalTables = tables.size();
+    _remainingTables = _totalTables;
+  }
+
+  public Map<String, String> getTableStatusMap() {
+    return _tableStatusMap;
+  }
+
+  public void setTableStatusMap(Map<String, String> tableStatusMap) {
+    _tableStatusMap = tableStatusMap;
+  }
+
+  public int getTotalTables() {
+    return _totalTables;
+  }
+
+  public void setTotalTables(int totalTables) {
+    _totalTables = totalTables;
+  }
+
+  public int getRemainingTables() {
+    return _remainingTables;
+  }
+
+  public void setRemainingTables(int remainingTables) {
+    _remainingTables = remainingTables;
+  }
+
+  public long getStartTimeMs() {
+    return _startTimeMs;
+  }
+
+  public void setStartTimeMs(long startTimeMs) {
+    _startTimeMs = startTimeMs;
+  }
+
+  public long getTimeToFinishInSeconds() {
+    return _timeToFinishInSeconds;
+  }
+
+  public void setTimeToFinishInSeconds(long timeToFinishInSeconds) {
+    _timeToFinishInSeconds = timeToFinishInSeconds;
+  }
+
+  public String getCompletionStatusMsg() {
+    return _completionStatusMsg;
+  }
+
+  public void setCompletionStatusMsg(String completionStatusMsg) {
+    _completionStatusMsg = completionStatusMsg;
+  }
+
+  public void updateTableStatus(String tableName, String status) {
+    _tableStatusMap.put(tableName, status);
+  }
+
+  public void putTableRebalanceJobId(String tableName, String jobId) {
+    _tableRebalanceJobIdMap.put(tableName, jobId);
+  }
+
+  public Map<String, String> getTableRebalanceJobIdMap() {
+    return _tableRebalanceJobIdMap;
+  }
+
+  public enum TableStatus {
+    UNPROCESSED, PROCESSING, PROCESSED
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
new file mode 100644
index 0000000000..57c17054d7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TenantRebalanceResult {
+  private String _jobId;
+  private Map<String, RebalanceResult> _rebalanceTableResults;
+
+  public TenantRebalanceResult(String jobId, Map<String, RebalanceResult> 
rebalanceTableResults, boolean verbose) {
+    _jobId = jobId;
+    if (verbose) {
+      _rebalanceTableResults = rebalanceTableResults;
+    } else {
+      _rebalanceTableResults = new HashMap<>();
+      rebalanceTableResults.forEach((table, result) -> {
+        _rebalanceTableResults.put(table, new 
RebalanceResult(result.getJobId(), result.getStatus(),
+            result.getDescription(), null, null, null));
+      });
+    }
+  }
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public Map<String, RebalanceResult> getRebalanceTableResults() {
+    return _rebalanceTableResults;
+  }
+
+  public void setJobId(String jobId) {
+    _jobId = jobId;
+  }
+
+  public void setRebalanceTableResults(Map<String, RebalanceResult> 
rebalanceTableResults) {
+    _rebalanceTableResults = rebalanceTableResults;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
similarity index 82%
copy from 
pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
copy to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index aaad454787..53df7824d5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metadata.controllerjob;
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
-public enum ControllerJobType {
-  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+
+public interface TenantRebalancer {
+  TenantRebalanceResult rebalance(TenantRebalanceContext context);
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
new file mode 100644
index 0000000000..7521caa3f3
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver 
{
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class);
+
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final String _jobId;
+  private final String _tenantName;
+  private final List<String> _unprocessedTables;
+  private final TenantRebalanceProgressStats _progressStats;
+  // Keep track of number of updates. Useful during debugging.
+  private int _numUpdatesToZk;
+
+  public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, 
Set<String> tables,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    Preconditions.checkState(tables != null && !tables.isEmpty(), "List of 
tables to observe is empty.");
+    _jobId = jobId;
+    _tenantName = tenantName;
+    _unprocessedTables = new ArrayList<>(tables);
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _progressStats = new TenantRebalanceProgressStats(tables);
+    _numUpdatesToZk = 0;
+  }
+
+  @Override
+  public void onTrigger(Trigger trigger, String tableName, String description) 
{
+    switch (trigger) {
+      case START_TRIGGER:
+        _progressStats.setStartTimeMs(System.currentTimeMillis());
+        break;
+      case REBALANCE_STARTED_TRIGGER:
+        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSING.name());
+        _progressStats.putTableRebalanceJobId(tableName, description);
+        break;
+      case REBALANCE_COMPLETED_TRIGGER:
+        _progressStats.updateTableStatus(tableName, 
TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
+        _unprocessedTables.remove(tableName);
+        _progressStats.setRemainingTables(_unprocessedTables.size());
+        break;
+      case REBALANCE_ERRORED_TRIGGER:
+        _progressStats.updateTableStatus(tableName, description);
+        _unprocessedTables.remove(tableName);
+        _progressStats.setRemainingTables(_unprocessedTables.size());
+        break;
+      default:
+    }
+    trackStatsInZk();
+  }
+
+  @Override
+  public void onSuccess(String msg) {
+    _progressStats.setCompletionStatusMsg(msg);
+    _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - 
_progressStats.getStartTimeMs()) / 1000);
+    trackStatsInZk();
+  }
+
+  @Override
+  public void onError(String errorMsg) {
+    _progressStats.setCompletionStatusMsg(errorMsg);
+    _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - 
_progressStats.getStartTimeMs());
+    trackStatsInZk();
+  }
+
+  private void trackStatsInZk() {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TENANT_REBALANCE.name());
+    try {
+      jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+          JsonUtils.objectToString(_progressStats));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
+    }
+    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata,
+        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TENANT_REBALANCE));
+    _numUpdatesToZk++;
+    LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
new file mode 100644
index 0000000000..a4237fee3f
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class TenantRebalancerTest extends ControllerTest {
+
+  private static final String DEFAULT_TENANT_NAME = "DefaultTenant";
+  private static final String TENANT_NAME = "TestTenant";
+  private static final String RAW_TABLE_NAME_A = "testTableA";
+  private static final String OFFLINE_TABLE_NAME_A = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_A);
+  private static final String RAW_TABLE_NAME_B = "testTableB";
+  private static final String OFFLINE_TABLE_NAME_B = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_B);
+  private static final int NUM_REPLICAS = 3;
+  ExecutorService _executorService;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    _executorService = Executors.newFixedThreadPool(3);
+  }
+
+  @Test
+  public void testRebalance()
+      throws Exception {
+    int numServers = 3;
+    for (int i = 0; i < numServers; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
+    }
+
+    TenantRebalancer tenantRebalancer = new 
DefaultTenantRebalancer(_helixResourceManager, _executorService);
+
+    // tag all servers and brokers to test tenant
+    addTenantTagToInstances(TENANT_NAME);
+
+    // create 2 tables, one on each of test tenant and default tenant
+    createTableWithSegments(RAW_TABLE_NAME_A, DEFAULT_TENANT_NAME);
+    createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME);
+
+    // Add 3 more servers which will be tagged to default tenant
+    int numServersToAdd = 3;
+    for (int i = 0; i < numServersToAdd; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
(numServers + i), true);
+    }
+
+    Map<String, Map<String, String>> oldSegmentAssignment =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
+
+    // rebalance the tables on test tenant
+    TenantRebalanceContext context = new TenantRebalanceContext();
+    context.setTenantName(TENANT_NAME);
+    context.setVerboseResult(true);
+    TenantRebalanceResult result = tenantRebalancer.rebalance(context);
+    RebalanceResult rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
+    Map<String, Map<String, String>> rebalancedAssignment = 
rebalanceResult.getSegmentAssignment();
+    // assignment should not change, with a NO_OP status as no now server is 
added to test tenant
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    assertEquals(oldSegmentAssignment, rebalancedAssignment);
+
+    // rebalance the tables on default tenant
+    context.setTenantName(DEFAULT_TENANT_NAME);
+    result = tenantRebalancer.rebalance(context);
+    // rebalancing default tenant should distribute the segment of table A 
over 6 servers
+    rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
+    InstancePartitions partitions = 
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
+    assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 6);
+
+    // ensure the ideal state and external view converges
+    assertTrue(waitForCompletion(result.getJobId()));
+    TenantRebalanceProgressStats progressStats = 
getProgress(result.getJobId());
+    
assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A));
+    assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A),
+        TenantRebalanceProgressStats.TableStatus.PROCESSED.name());
+    Map<String, Map<String, String>> idealState =
+        
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+    Map<String, Map<String, String>> externalView =
+        
_helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields();
+    assertEquals(idealState, externalView);
+  }
+
+  private boolean waitForCompletion(String jobId) {
+    int retries = 5;
+    while (retries > 0) {
+      try {
+        TenantRebalanceProgressStats stats = getProgress(jobId);
+        if (stats != null && stats.getRemainingTables() == 0) {
+          return true;
+        }
+        retries--;
+        Thread.sleep(2000);
+      } catch (JsonProcessingException | InterruptedException e) {
+        return false;
+      }
+    }
+    return false;
+  }
+
+  private TenantRebalanceProgressStats getProgress(String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TENANT_REBALANCE);
+    if (controllerJobZKMetadata == null) {
+      return null;
+    }
+    return 
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+        TenantRebalanceProgressStats.class);
+  }
+
+  private void createTableWithSegments(String rawTableName, String tenant)
+      throws IOException {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
+        
.setServerTenant(tenant).setBrokerTenant(tenant).setNumReplicas(NUM_REPLICAS).build();
+    // Create the table
+    _helixResourceManager.addTable(tableConfig);
+    // Add the segments
+    int numSegments = 10;
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    for (int i = 0; i < numSegments; i++) {
+      _helixResourceManager.addNewSegment(offlineTableName,
+          SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName, 
"segment_" + i), null);
+    }
+  }
+
+  private void addTenantTagToInstances(String testTenant) {
+    String offlineTag = TagNameUtils.getOfflineTagForTenant(testTenant);
+    String brokerTag = TagNameUtils.getBrokerTagForTenant(testTenant);
+    _helixResourceManager.getAllInstances().forEach(instance -> {
+      List<String> existingTags = 
_helixResourceManager.getHelixInstanceConfig(instance).getTags();
+      if (instance.startsWith(SERVER_INSTANCE_ID_PREFIX)) {
+        existingTags.add(offlineTag);
+      } else if (instance.startsWith(BROKER_INSTANCE_ID_PREFIX)) {
+        existingTags.add(brokerTag);
+      }
+      _helixResourceManager.updateInstanceTags(instance, String.join(",", 
existingTags), true);
+    });
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+    _executorService.shutdown();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 5c1516210d..e72d066bc9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -80,6 +80,7 @@ public class Actions {
     public static final String UPDATE_TASK_QUEUE = "UpdateTaskQueue";
     public static final String UPDATE_TENANT = "UpdateTenant";
     public static final String UPDATE_TENANT_METADATA = "UpdateTenantMetadata";
+    public static final String REBALANCE_TENANT_TABLES = 
"RebalanceTenantTables";
     public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval";
     public static final String UPDATE_USER = "UpdateUser";
     public static final String UPDATE_ZNODE = "UpdateZnode";
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 180744a831..46b2c5148e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -754,6 +754,7 @@ public class CommonConstants {
      */
     public static final String JOB_TYPE = "jobType";
     public static final String TABLE_NAME_WITH_TYPE = "tableName";
+    public static final String TENANT_NAME = "tenantName";
     public static final String JOB_ID = "jobId";
     public static final String SUBMISSION_TIME_MS = "submissionTimeMs";
     public static final String MESSAGE_COUNT = "messageCount";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to