This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d6b1b4feba Tenant rebalance and status tracking APIs (#11128) d6b1b4feba is described below commit d6b1b4feba1f9a2168f5119e1b673d7cfcf8d146 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Sat Aug 12 02:23:10 2023 +0530 Tenant rebalance and status tracking APIs (#11128) * endpoint to rebalance tables of a tenant * move tenant rebalance logic to a separate TenantRebalancer service * implement extractRebalanceConfig * function rename * observability * status reporting * moved tenant rebalance related classes to a separate package * ignore null values in response on RebalanceResult * bugfix, initiate the _remainingTables field * integration test for tenant rebalance * break while loop gracefully * reuse the last active parallel thread to run the sequential jobs * metadata npe handling * maintain separate executor service for tenant rebalance operations * prioritise dimension tables * var name fix * todo comment added * Add Authorize annotations to APIs --- .../metadata/controllerjob/ControllerJobType.java | 2 +- .../pinot/controller/BaseControllerStarter.java | 10 + .../api/resources/PinotTenantRestletResource.java | 55 +++++ .../TenantRebalanceJobStatusResponse.java | 43 ++++ .../helix/core/rebalance/RebalanceContext.java | 138 ++++++++++++ .../helix/core/rebalance/RebalanceResult.java | 5 + .../rebalance/tenant/DefaultTenantRebalancer.java | 234 +++++++++++++++++++++ .../rebalance/tenant/TenantRebalanceContext.java | 81 +++++++ .../rebalance/tenant/TenantRebalanceObserver.java | 21 +- .../tenant/TenantRebalanceProgressStats.java | 114 ++++++++++ .../rebalance/tenant/TenantRebalanceResult.java | 59 ++++++ .../core/rebalance/tenant/TenantRebalancer.java | 7 +- .../tenant/ZkBasedTenantRebalanceObserver.java | 116 ++++++++++ .../rebalance/tenant/TenantRebalancerTest.java | 191 +++++++++++++++++ .../java/org/apache/pinot/core/auth/Actions.java | 1 + .../apache/pinot/spi/utils/CommonConstants.java | 1 + 16 files changed, 1071 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java index aaad454787..025d7b6c39 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java @@ -19,5 +19,5 @@ package org.apache.pinot.common.metadata.controllerjob; public enum ControllerJobType { - RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE + RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 594ca27bfd..610c0a01f0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -85,6 +85,8 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; +import org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator; import org.apache.pinot.controller.helix.core.retention.RetentionManager; import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory; @@ -173,6 +175,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { protected StaleInstancesCleanupTask _staleInstancesCleanupTask; protected TaskMetricsEmitter _taskMetricsEmitter; protected MultiThreadedHttpConnectionManager _connectionManager; + protected TenantRebalancer _tenantRebalancer; + protected ExecutorService _tenantRebalanceExecutorService; @Override public void init(PinotConfiguration pinotConfiguration) @@ -223,6 +227,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { // This executor service is used to do async tasks from multiget util or table rebalancing. _executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build()); + _tenantRebalanceExecutorService = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("tenant-rebalance-thread-%d").build()); + _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } // Initialize the table config tuner registry. @@ -484,6 +491,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class); bind(_sqlQueryExecutor).to(SqlQueryExecutor.class); bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class); + bind(_tenantRebalancer).to(TenantRebalancer.class); String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR); if (loggerRootDir != null) { bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class); @@ -781,6 +789,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Shutting down executor service"); _executorService.shutdownNow(); _executorService.awaitTermination(10L, TimeUnit.SECONDS); + _tenantRebalanceExecutorService.shutdownNow(); + _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS); } catch (final Exception e) { LOGGER.error("Caught exception while shutting down", e); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index 70a24e15e4..f160bba80d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -32,6 +32,7 @@ import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import javax.inject.Inject; @@ -49,6 +50,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.api.access.AccessType; @@ -56,6 +58,10 @@ import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; @@ -64,6 +70,7 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.RebalanceConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +113,9 @@ public class PinotTenantRestletResource { @Inject ControllerMetrics _controllerMetrics; + @Inject + TenantRebalancer _tenantRebalancer; + @POST @Path("/tenants") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CREATE_TENANT) @@ -565,4 +575,49 @@ public class PinotTenantRestletResource { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_TENANT_DELETE_ERROR, 1L); throw new ControllerApplicationException(LOGGER, "Error deleting tenant", Response.Status.INTERNAL_SERVER_ERROR); } + + @POST + @Produces(MediaType.APPLICATION_JSON) + @Authenticate(AccessType.UPDATE) + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.REBALANCE_TENANT_TABLES) + @Path("/tenants/{tenantName}/rebalance") + @ApiOperation(value = "Rebalances all the tables that are part of the tenant") + public TenantRebalanceResult rebalance( + @ApiParam(value = "Name of the tenant whose table are to be rebalanced", required = true) + @PathParam("tenantName") String tenantName, @ApiParam(required = true) TenantRebalanceContext context) { + context.setTenantName(tenantName); + return _tenantRebalancer.rebalance(context); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Authenticate(AccessType.READ) + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_REBALANCE_STATUS) + @Path("/tenants/rebalanceStatus/{jobId}") + @ApiOperation(value = "Gets detailed stats of a tenant rebalance operation", + notes = "Gets detailed stats of a tenant rebalance operation") + public TenantRebalanceJobStatusResponse rebalanceStatus( + @ApiParam(value = "Tenant rebalance job id", required = true) @PathParam("jobId") String jobId) + throws JsonProcessingException { + Map<String, String> controllerJobZKMetadata = + _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TENANT_REBALANCE); + + if (controllerJobZKMetadata == null) { + throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId, + Response.Status.NOT_FOUND); + } + TenantRebalanceProgressStats tenantRebalanceProgressStats = + JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS), + TenantRebalanceProgressStats.class); + long timeSinceStartInSecs = tenantRebalanceProgressStats.getTimeToFinishInSeconds(); + if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) { + timeSinceStartInSecs = + (System.currentTimeMillis() - tenantRebalanceProgressStats.getStartTimeMs()) / 1000; + } + + TenantRebalanceJobStatusResponse tenantRebalanceJobStatusResponse = new TenantRebalanceJobStatusResponse(); + tenantRebalanceJobStatusResponse.setTenantRebalanceProgressStats(tenantRebalanceProgressStats); + tenantRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs); + return tenantRebalanceJobStatusResponse; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java new file mode 100644 index 0000000000..db1bfda9e8 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TenantRebalanceJobStatusResponse.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats; + + +public class TenantRebalanceJobStatusResponse { + private long _timeElapsedSinceStartInSeconds; + private TenantRebalanceProgressStats _tenantRebalanceProgressStats; + + public long getTimeElapsedSinceStartInSeconds() { + return _timeElapsedSinceStartInSeconds; + } + + public void setTimeElapsedSinceStartInSeconds(long timeElapsedSinceStartInSeconds) { + _timeElapsedSinceStartInSeconds = timeElapsedSinceStartInSeconds; + } + + public TenantRebalanceProgressStats getTenantRebalanceProgressStats() { + return _tenantRebalanceProgressStats; + } + + public void setTenantRebalanceProgressStats(TenantRebalanceProgressStats tenantRebalanceProgressStats) { + _tenantRebalanceProgressStats = tenantRebalanceProgressStats; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java new file mode 100644 index 0000000000..cd6e06c399 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel +public class RebalanceContext { + // TODO : simplify the rebalance configs wherever possible + @JsonProperty("dryRun") + @ApiModelProperty(example = "false") + private Boolean _dryRun = false; + @JsonProperty("reassignInstances") + @ApiModelProperty(example = "false") + private Boolean _reassignInstances = false; + @JsonProperty("includeConsuming") + @ApiModelProperty(example = "false") + private Boolean _includeConsuming = false; + @JsonProperty("bootstrap") + @ApiModelProperty(example = "false") + private Boolean _bootstrap = false; + @JsonProperty("downtime") + @ApiModelProperty(example = "false") + private Boolean _downtime = false; + @JsonProperty("minAvailableReplicas") + @ApiModelProperty(example = "1") + private Integer _minAvailableReplicas = 1; + @JsonProperty("bestEfforts") + @ApiModelProperty(example = "false") + private Boolean _bestEfforts = false; + @JsonProperty("externalViewCheckIntervalInMs") + @ApiModelProperty(example = "1000") + private Long _externalViewCheckIntervalInMs = 1000L; + @JsonProperty("externalViewStabilizationTimeoutInMs") + @ApiModelProperty(example = "3600000") + private Long _externalViewStabilizationTimeoutInMs = 3600000L; + @JsonProperty("updateTargetTier") + @ApiModelProperty(example = "false") + private Boolean _updateTargetTier = false; + + public Boolean isDryRun() { + return _dryRun; + } + + public void setDryRun(Boolean dryRun) { + _dryRun = dryRun; + } + + public Boolean isReassignInstances() { + return _reassignInstances; + } + + public void setReassignInstances(Boolean reassignInstances) { + _reassignInstances = reassignInstances; + } + + public Boolean isIncludeConsuming() { + return _includeConsuming; + } + + public void setIncludeConsuming(Boolean includeConsuming) { + _includeConsuming = includeConsuming; + } + + public Boolean isBootstrap() { + return _bootstrap; + } + + public void setBootstrap(Boolean bootstrap) { + _bootstrap = bootstrap; + } + + public Boolean isDowntime() { + return _downtime; + } + + public void setDowntime(Boolean downtime) { + _downtime = downtime; + } + + public Integer getMinAvailableReplicas() { + return _minAvailableReplicas; + } + + public void setMinAvailableReplicas(Integer minAvailableReplicas) { + _minAvailableReplicas = minAvailableReplicas; + } + + public Boolean isBestEfforts() { + return _bestEfforts; + } + + public void setBestEfforts(Boolean bestEfforts) { + _bestEfforts = bestEfforts; + } + + public Long getExternalViewCheckIntervalInMs() { + return _externalViewCheckIntervalInMs; + } + + public void setExternalViewCheckIntervalInMs(Long externalViewCheckIntervalInMs) { + _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs; + } + + public Long getExternalViewStabilizationTimeoutInMs() { + return _externalViewStabilizationTimeoutInMs; + } + + public void setExternalViewStabilizationTimeoutInMs(Long externalViewStabilizationTimeoutInMs) { + _externalViewStabilizationTimeoutInMs = externalViewStabilizationTimeoutInMs; + } + + public Boolean isUpdateTargetTier() { + return _updateTargetTier; + } + + public void setUpdateTargetTier(Boolean updateTargetTier) { + _updateTargetTier = updateTargetTier; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java index c76d06fcaf..ed3ad624d8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.helix.core.rebalance; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Map; import javax.annotation.Nullable; @@ -28,11 +29,15 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; @JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) public class RebalanceResult { private final String _jobId; private final Status _status; + @JsonInclude(JsonInclude.Include.NON_NULL) private final Map<InstancePartitionsType, InstancePartitions> _instanceAssignment; + @JsonInclude(JsonInclude.Include.NON_NULL) private final Map<String, InstancePartitions> _tierInstanceAssignment; + @JsonInclude(JsonInclude.Include.NON_NULL) private final Map<String, Map<String, String>> _segmentAssignment; private final String _description; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java new file mode 100644 index 0000000000..d0278c15e3 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.google.common.collect.Sets; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.RebalanceConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultTenantRebalancer implements TenantRebalancer { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class); + PinotHelixResourceManager _pinotHelixResourceManager; + ExecutorService _executorService; + + public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) { + _pinotHelixResourceManager = pinotHelixResourceManager; + _executorService = executorService; + } + + @Override + public TenantRebalanceResult rebalance(TenantRebalanceContext context) { + Map<String, RebalanceResult> rebalanceResult = new HashMap<>(); + Set<String> tables = getTenantTables(context.getTenantName()); + tables.forEach(table -> { + try { + Configuration config = extractRebalanceConfig(context); + config.setProperty(RebalanceConfigConstants.DRY_RUN, true); + rebalanceResult.put(table, _pinotHelixResourceManager.rebalanceTable(table, config, false)); + } catch (TableNotFoundException exception) { + rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), + null, null, null)); + } + }); + if (context.isDryRun() || context.isDowntime()) { + return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult()); + } else { + for (String table : rebalanceResult.keySet()) { + RebalanceResult result = rebalanceResult.get(table); + if (result.getStatus() == RebalanceResult.Status.DONE) { + rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, + "In progress, check controller task status for the", result.getInstanceAssignment(), + result.getTierInstanceAssignment(), result.getSegmentAssignment())); + } + } + } + + String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier(); + TenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(), + tables, _pinotHelixResourceManager); + observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); + final Deque<String> sequentialQueue = new LinkedList<>(); + final Deque<String> parallelQueue = new ConcurrentLinkedDeque<>(); + // ensure atleast 1 thread is created to run the sequential table rebalance operations + int parallelism = Math.max(context.getDegreeOfParallelism(), 1); + Set<String> dimTables = getDimensionalTables(context.getTenantName()); + AtomicInteger activeThreads = new AtomicInteger(parallelism); + try { + if (parallelism > 1) { + Set<String> parallelTables; + if (!context.getParallelWhitelist().isEmpty()) { + parallelTables = new HashSet<>(context.getParallelWhitelist()); + } else { + parallelTables = new HashSet<>(tables); + } + if (!context.getParallelBlacklist().isEmpty()) { + parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist()); + } + parallelTables.forEach(table -> { + if (dimTables.contains(table)) { + // prioritise dimension tables + parallelQueue.addFirst(table); + } else { + parallelQueue.addLast(table); + } + }); + Sets.difference(tables, parallelTables).forEach(table -> { + if (dimTables.contains(table)) { + // prioritise dimension tables + sequentialQueue.addFirst(table); + } else { + sequentialQueue.addLast(table); + } + }); + } else { + tables.forEach(table -> { + if (dimTables.contains(table)) { + // prioritise dimension tables + sequentialQueue.addFirst(table); + } else { + sequentialQueue.addLast(table); + } + }); + } + + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + while (true) { + String table = parallelQueue.pollFirst(); + if (table == null) { + break; + } + Configuration config = extractRebalanceConfig(context); + config.setProperty(RebalanceConfigConstants.DRY_RUN, false); + config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId()); + rebalanceTable(table, config, observer); + } + // Last parallel thread to finish the table rebalance job will pick up the + // sequential table rebalance execution + if (activeThreads.decrementAndGet() == 0) { + Configuration config = extractRebalanceConfig(context); + config.setProperty(RebalanceConfigConstants.DRY_RUN, false); + while (true) { + String table = sequentialQueue.pollFirst(); + if (table == null) { + break; + } + config.setProperty(RebalanceConfigConstants.JOB_ID, rebalanceResult.get(table).getJobId()); + rebalanceTable(table, config, observer); + } + observer.onSuccess(String.format("Successfully rebalanced tenant %s.", context.getTenantName())); + } + }); + } + } catch (Exception exception) { + observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", context.getTenantName(), + exception.getMessage())); + } + return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, context.isVerboseResult()); + } + + private Set<String> getDimensionalTables(String tenantName) { + Set<String> dimTables = new HashSet<>(); + for (String table : _pinotHelixResourceManager.getAllTables()) { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(table); + if (tableConfig == null) { + LOGGER.error("Unable to retrieve table config for table: {}", table); + continue; + } + if (tenantName.equals(tableConfig.getTenantConfig().getServer()) && tableConfig.isDimTable()) { + dimTables.add(table); + } + } + return dimTables; + } + + private Configuration extractRebalanceConfig(TenantRebalanceContext context) { + Configuration rebalanceConfig = new BaseConfiguration(); + rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, context.isDryRun()); + rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, context.isReassignInstances()); + rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, context.isIncludeConsuming()); + rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, context.isBootstrap()); + rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, context.isDowntime()); + rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, + context.getMinAvailableReplicas()); + rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, context.isBestEfforts()); + rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, + context.getExternalViewCheckIntervalInMs()); + rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, + context.getExternalViewStabilizationTimeoutInMs()); + rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, context.isUpdateTargetTier()); + rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, createUniqueRebalanceJobIdentifier()); + return rebalanceConfig; + } + + private String createUniqueRebalanceJobIdentifier() { + return UUID.randomUUID().toString(); + } + + private Set<String> getTenantTables(String tenantName) { + Set<String> tables = new HashSet<>(); + for (String table : _pinotHelixResourceManager.getAllTables()) { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(table); + if (tableConfig == null) { + LOGGER.error("Unable to retrieve table config for table: {}", table); + continue; + } + String tableConfigTenant = tableConfig.getTenantConfig().getServer(); + if (tenantName.equals(tableConfigTenant)) { + tables.add(table); + } + } + return tables; + } + + private void rebalanceTable(String tableName, Configuration config, + TenantRebalanceObserver observer) { + try { + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, + config.getString(RebalanceConfigConstants.JOB_ID)); + RebalanceResult result = _pinotHelixResourceManager.rebalanceTable(tableName, config, true); + if (result.getStatus().equals(RebalanceResult.Status.DONE)) { + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null); + } else { + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, + result.getDescription()); + } + } catch (Throwable t) { + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, + String.format("Caught exception/error while rebalancing table: %s", tableName)); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java new file mode 100644 index 0000000000..5e76dcc014 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import java.util.HashSet; +import java.util.Set; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext; + + +public class TenantRebalanceContext extends RebalanceContext { + @JsonIgnore + private String _tenantName; + @JsonProperty("degreeOfParallelism") + @ApiModelProperty(example = "1") + private Integer _degreeOfParallelism = 1; + @JsonProperty("parallelWhitelist") + private Set<String> _parallelWhitelist = new HashSet<>(); + @JsonProperty("parallelBlacklist") + private Set<String> _parallelBlacklist = new HashSet<>(); + + private boolean _verboseResult = false; + + public String getTenantName() { + return _tenantName; + } + + public void setTenantName(String tenantName) { + _tenantName = tenantName; + } + + public int getDegreeOfParallelism() { + return _degreeOfParallelism; + } + + public void setDegreeOfParallelism(int degreeOfParallelism) { + _degreeOfParallelism = degreeOfParallelism; + } + + public Set<String> getParallelWhitelist() { + return _parallelWhitelist; + } + + public void setParallelWhitelist(Set<String> parallelWhitelist) { + _parallelWhitelist = parallelWhitelist; + } + + public Set<String> getParallelBlacklist() { + return _parallelBlacklist; + } + + public void setParallelBlacklist(Set<String> parallelBlacklist) { + _parallelBlacklist = parallelBlacklist; + } + + public boolean isVerboseResult() { + return _verboseResult; + } + + public void setVerboseResult(boolean verboseResult) { + _verboseResult = verboseResult; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java similarity index 59% copy from pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java copy to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java index aaad454787..82dd086362 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceObserver.java @@ -16,8 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common.metadata.controllerjob; +package org.apache.pinot.controller.helix.core.rebalance.tenant; -public enum ControllerJobType { - RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE +public interface TenantRebalanceObserver { + enum Trigger { + // Start of tenant rebalance Trigger + START_TRIGGER, + // rebalance of a table is started + REBALANCE_STARTED_TRIGGER, + // rebalance of a table is completed + REBALANCE_COMPLETED_TRIGGER, + // rebalance of a table is failed + REBALANCE_ERRORED_TRIGGER + } + + void onTrigger(TenantRebalanceObserver.Trigger trigger, String tableName, String description); + + void onSuccess(String msg); + + void onError(String errorMsg); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java new file mode 100644 index 0000000000..fb77100ece --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceProgressStats.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + + +public class TenantRebalanceProgressStats { + // status map of tables and their respective rebalance status + private Map<String, String> _tableStatusMap; + private final Map<String, String> _tableRebalanceJobIdMap = new HashMap<>(); + private int _totalTables; + private int _remainingTables; + // When did Rebalance start + private long _startTimeMs; + // How long did rebalance take + private long _timeToFinishInSeconds; + // Success/failure message + private String _completionStatusMsg; + + public TenantRebalanceProgressStats() { + } + + public TenantRebalanceProgressStats(Set<String> tables) { + _tableStatusMap = tables.stream() + .collect(Collectors.toMap(Function.identity(), k -> TableStatus.UNPROCESSED.name())); + _totalTables = tables.size(); + _remainingTables = _totalTables; + } + + public Map<String, String> getTableStatusMap() { + return _tableStatusMap; + } + + public void setTableStatusMap(Map<String, String> tableStatusMap) { + _tableStatusMap = tableStatusMap; + } + + public int getTotalTables() { + return _totalTables; + } + + public void setTotalTables(int totalTables) { + _totalTables = totalTables; + } + + public int getRemainingTables() { + return _remainingTables; + } + + public void setRemainingTables(int remainingTables) { + _remainingTables = remainingTables; + } + + public long getStartTimeMs() { + return _startTimeMs; + } + + public void setStartTimeMs(long startTimeMs) { + _startTimeMs = startTimeMs; + } + + public long getTimeToFinishInSeconds() { + return _timeToFinishInSeconds; + } + + public void setTimeToFinishInSeconds(long timeToFinishInSeconds) { + _timeToFinishInSeconds = timeToFinishInSeconds; + } + + public String getCompletionStatusMsg() { + return _completionStatusMsg; + } + + public void setCompletionStatusMsg(String completionStatusMsg) { + _completionStatusMsg = completionStatusMsg; + } + + public void updateTableStatus(String tableName, String status) { + _tableStatusMap.put(tableName, status); + } + + public void putTableRebalanceJobId(String tableName, String jobId) { + _tableRebalanceJobIdMap.put(tableName, jobId); + } + + public Map<String, String> getTableRebalanceJobIdMap() { + return _tableRebalanceJobIdMap; + } + + public enum TableStatus { + UNPROCESSED, PROCESSING, PROCESSED + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java new file mode 100644 index 0000000000..57c17054d7 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public class TenantRebalanceResult { + private String _jobId; + private Map<String, RebalanceResult> _rebalanceTableResults; + + public TenantRebalanceResult(String jobId, Map<String, RebalanceResult> rebalanceTableResults, boolean verbose) { + _jobId = jobId; + if (verbose) { + _rebalanceTableResults = rebalanceTableResults; + } else { + _rebalanceTableResults = new HashMap<>(); + rebalanceTableResults.forEach((table, result) -> { + _rebalanceTableResults.put(table, new RebalanceResult(result.getJobId(), result.getStatus(), + result.getDescription(), null, null, null)); + }); + } + } + + public String getJobId() { + return _jobId; + } + + public Map<String, RebalanceResult> getRebalanceTableResults() { + return _rebalanceTableResults; + } + + public void setJobId(String jobId) { + _jobId = jobId; + } + + public void setRebalanceTableResults(Map<String, RebalanceResult> rebalanceTableResults) { + _rebalanceTableResults = rebalanceTableResults; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java similarity index 82% copy from pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java copy to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java index aaad454787..53df7824d5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common.metadata.controllerjob; +package org.apache.pinot.controller.helix.core.rebalance.tenant; -public enum ControllerJobType { - RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE + +public interface TenantRebalancer { + TenantRebalanceResult rebalance(TenantRebalanceContext context); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java new file mode 100644 index 0000000000..7521caa3f3 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.RebalanceConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTenantRebalanceObserver.class); + + private final PinotHelixResourceManager _pinotHelixResourceManager; + private final String _jobId; + private final String _tenantName; + private final List<String> _unprocessedTables; + private final TenantRebalanceProgressStats _progressStats; + // Keep track of number of updates. Useful during debugging. + private int _numUpdatesToZk; + + public ZkBasedTenantRebalanceObserver(String jobId, String tenantName, Set<String> tables, + PinotHelixResourceManager pinotHelixResourceManager) { + Preconditions.checkState(tables != null && !tables.isEmpty(), "List of tables to observe is empty."); + _jobId = jobId; + _tenantName = tenantName; + _unprocessedTables = new ArrayList<>(tables); + _pinotHelixResourceManager = pinotHelixResourceManager; + _progressStats = new TenantRebalanceProgressStats(tables); + _numUpdatesToZk = 0; + } + + @Override + public void onTrigger(Trigger trigger, String tableName, String description) { + switch (trigger) { + case START_TRIGGER: + _progressStats.setStartTimeMs(System.currentTimeMillis()); + break; + case REBALANCE_STARTED_TRIGGER: + _progressStats.updateTableStatus(tableName, TenantRebalanceProgressStats.TableStatus.PROCESSING.name()); + _progressStats.putTableRebalanceJobId(tableName, description); + break; + case REBALANCE_COMPLETED_TRIGGER: + _progressStats.updateTableStatus(tableName, TenantRebalanceProgressStats.TableStatus.PROCESSED.name()); + _unprocessedTables.remove(tableName); + _progressStats.setRemainingTables(_unprocessedTables.size()); + break; + case REBALANCE_ERRORED_TRIGGER: + _progressStats.updateTableStatus(tableName, description); + _unprocessedTables.remove(tableName); + _progressStats.setRemainingTables(_unprocessedTables.size()); + break; + default: + } + trackStatsInZk(); + } + + @Override + public void onSuccess(String msg) { + _progressStats.setCompletionStatusMsg(msg); + _progressStats.setTimeToFinishInSeconds((System.currentTimeMillis() - _progressStats.getStartTimeMs()) / 1000); + trackStatsInZk(); + } + + @Override + public void onError(String errorMsg) { + _progressStats.setCompletionStatusMsg(errorMsg); + _progressStats.setTimeToFinishInSeconds(System.currentTimeMillis() - _progressStats.getStartTimeMs()); + trackStatsInZk(); + } + + private void trackStatsInZk() { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name()); + try { + jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS, + JsonUtils.objectToString(_progressStats)); + } catch (JsonProcessingException e) { + LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _jobId, e); + } + _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, + ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TENANT_REBALANCE)); + _numUpdatesToZk++; + LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ", _numUpdatesToZk, _jobId); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java new file mode 100644 index 0000000000..a4237fee3f --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.RebalanceConfigConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class TenantRebalancerTest extends ControllerTest { + + private static final String DEFAULT_TENANT_NAME = "DefaultTenant"; + private static final String TENANT_NAME = "TestTenant"; + private static final String RAW_TABLE_NAME_A = "testTableA"; + private static final String OFFLINE_TABLE_NAME_A = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_A); + private static final String RAW_TABLE_NAME_B = "testTableB"; + private static final String OFFLINE_TABLE_NAME_B = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_B); + private static final int NUM_REPLICAS = 3; + ExecutorService _executorService; + + @BeforeClass + public void setUp() + throws Exception { + startZk(); + startController(); + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + _executorService = Executors.newFixedThreadPool(3); + } + + @Test + public void testRebalance() + throws Exception { + int numServers = 3; + for (int i = 0; i < numServers; i++) { + addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, true); + } + + TenantRebalancer tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _executorService); + + // tag all servers and brokers to test tenant + addTenantTagToInstances(TENANT_NAME); + + // create 2 tables, one on each of test tenant and default tenant + createTableWithSegments(RAW_TABLE_NAME_A, DEFAULT_TENANT_NAME); + createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME); + + // Add 3 more servers which will be tagged to default tenant + int numServersToAdd = 3; + for (int i = 0; i < numServersToAdd; i++) { + addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + (numServers + i), true); + } + + Map<String, Map<String, String>> oldSegmentAssignment = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields(); + + // rebalance the tables on test tenant + TenantRebalanceContext context = new TenantRebalanceContext(); + context.setTenantName(TENANT_NAME); + context.setVerboseResult(true); + TenantRebalanceResult result = tenantRebalancer.rebalance(context); + RebalanceResult rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B); + Map<String, Map<String, String>> rebalancedAssignment = rebalanceResult.getSegmentAssignment(); + // assignment should not change, with a NO_OP status as no now server is added to test tenant + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + assertEquals(oldSegmentAssignment, rebalancedAssignment); + + // rebalance the tables on default tenant + context.setTenantName(DEFAULT_TENANT_NAME); + result = tenantRebalancer.rebalance(context); + // rebalancing default tenant should distribute the segment of table A over 6 servers + rebalanceResult = result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A); + InstancePartitions partitions = rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE); + assertEquals(partitions.getPartitionToInstancesMap().get("0_0").size(), 6); + + // ensure the ideal state and external view converges + assertTrue(waitForCompletion(result.getJobId())); + TenantRebalanceProgressStats progressStats = getProgress(result.getJobId()); + assertTrue(progressStats.getTableRebalanceJobIdMap().containsKey(OFFLINE_TABLE_NAME_A)); + assertEquals(progressStats.getTableStatusMap().get(OFFLINE_TABLE_NAME_A), + TenantRebalanceProgressStats.TableStatus.PROCESSED.name()); + Map<String, Map<String, String>> idealState = + _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_A).getRecord().getMapFields(); + Map<String, Map<String, String>> externalView = + _helixResourceManager.getTableExternalView(OFFLINE_TABLE_NAME_A).getRecord().getMapFields(); + assertEquals(idealState, externalView); + } + + private boolean waitForCompletion(String jobId) { + int retries = 5; + while (retries > 0) { + try { + TenantRebalanceProgressStats stats = getProgress(jobId); + if (stats != null && stats.getRemainingTables() == 0) { + return true; + } + retries--; + Thread.sleep(2000); + } catch (JsonProcessingException | InterruptedException e) { + return false; + } + } + return false; + } + + private TenantRebalanceProgressStats getProgress(String jobId) + throws JsonProcessingException { + Map<String, String> controllerJobZKMetadata = + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TENANT_REBALANCE); + if (controllerJobZKMetadata == null) { + return null; + } + return JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS), + TenantRebalanceProgressStats.class); + } + + private void createTableWithSegments(String rawTableName, String tenant) + throws IOException { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName) + .setServerTenant(tenant).setBrokerTenant(tenant).setNumReplicas(NUM_REPLICAS).build(); + // Create the table + _helixResourceManager.addTable(tableConfig); + // Add the segments + int numSegments = 10; + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); + for (int i = 0; i < numSegments; i++) { + _helixResourceManager.addNewSegment(offlineTableName, + SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName, "segment_" + i), null); + } + } + + private void addTenantTagToInstances(String testTenant) { + String offlineTag = TagNameUtils.getOfflineTagForTenant(testTenant); + String brokerTag = TagNameUtils.getBrokerTagForTenant(testTenant); + _helixResourceManager.getAllInstances().forEach(instance -> { + List<String> existingTags = _helixResourceManager.getHelixInstanceConfig(instance).getTags(); + if (instance.startsWith(SERVER_INSTANCE_ID_PREFIX)) { + existingTags.add(offlineTag); + } else if (instance.startsWith(BROKER_INSTANCE_ID_PREFIX)) { + existingTags.add(brokerTag); + } + _helixResourceManager.updateInstanceTags(instance, String.join(",", existingTags), true); + }); + } + + @AfterClass + public void tearDown() { + stopFakeInstances(); + stopController(); + stopZk(); + _executorService.shutdown(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 5c1516210d..e72d066bc9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -80,6 +80,7 @@ public class Actions { public static final String UPDATE_TASK_QUEUE = "UpdateTaskQueue"; public static final String UPDATE_TENANT = "UpdateTenant"; public static final String UPDATE_TENANT_METADATA = "UpdateTenantMetadata"; + public static final String REBALANCE_TENANT_TABLES = "RebalanceTenantTables"; public static final String UPDATE_TIME_INTERVAL = "UpdateTimeInterval"; public static final String UPDATE_USER = "UpdateUser"; public static final String UPDATE_ZNODE = "UpdateZnode"; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 180744a831..46b2c5148e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -754,6 +754,7 @@ public class CommonConstants { */ public static final String JOB_TYPE = "jobType"; public static final String TABLE_NAME_WITH_TYPE = "tableName"; + public static final String TENANT_NAME = "tenantName"; public static final String JOB_ID = "jobId"; public static final String SUBMISSION_TIME_MS = "submissionTimeMs"; public static final String MESSAGE_COUNT = "messageCount"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org