This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f81b03c204 Refactor ControllerJobType enum into extensible interface (#16106) f81b03c204 is described below commit f81b03c20493c9dd8ed818c1706262eb2dd64797 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Mon Jun 16 04:31:35 2025 +0100 Refactor ControllerJobType enum into extensible interface (#16106) --- .../pinot/controller/BaseControllerStarter.java | 4 +- .../apache/pinot/controller/ControllerConf.java | 10 ++-- .../api/resources/PinotRealtimeTableResource.java | 6 +-- .../api/resources/PinotSegmentRestletResource.java | 4 +- .../api/resources/PinotTableRestletResource.java | 14 +++--- .../api/resources/PinotTenantRestletResource.java | 4 +- .../helix/core/PinotHelixResourceManager.java | 18 ++++---- ...trollerJobType.java => ControllerJobTypes.java} | 22 ++++----- .../helix/core/rebalance/RebalanceChecker.java | 8 ++-- .../core/rebalance/TableRebalanceManager.java | 9 ++-- .../rebalance/ZkBasedTableRebalanceObserver.java | 6 +-- .../tenant/ZkBasedTenantRebalanceObserver.java | 6 +-- .../helix/core/util/ControllerZkHelixUtils.java | 6 +-- .../helix/core/rebalance/RebalanceCheckerTest.java | 16 +++---- .../rebalance/tenant/TenantRebalancerTest.java | 4 +- .../core/util/ControllerZkHelixUtilsTest.java | 30 ++++++------ .../tests/LLCRealtimeClusterIntegrationTest.java | 4 +- .../tests/TableRebalanceIntegrationTest.java | 28 +++++------ .../pinot/spi/controller/ControllerJobType.java | 54 ++++++++++++++++++++++ .../apache/pinot/spi/utils/CommonConstants.java | 2 + 20 files changed, 154 insertions(+), 101 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 346fcc7b24..8ab5c6ac27 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -94,7 +94,7 @@ import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; @@ -281,7 +281,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled()); ContinuousJfrStarter.init(_config); - ControllerJobType.init(_config); + ControllerJobTypes.init(_config); } /// Returns the default cluster configs to be stored in ZK as Helix cluster config. These configs will then be diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 03f4c9bca7..4b64b2abf0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -41,6 +41,7 @@ import org.apache.pinot.spi.utils.TimeUtils; import static org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX; import static org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_INSTANCE_ID; import static org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX; +import static org.apache.pinot.spi.utils.CommonConstants.ControllerJob; public class ControllerConf extends PinotConfiguration { @@ -383,7 +384,6 @@ public class ControllerConf extends PinotConfiguration { public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK = "controller.tenant.rebalance.maxJobsInZK"; public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK = "controller.reload.segment.maxJobsInZK"; public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK = "controller.force.commit.maxJobsInZK"; - public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100; private final Map<String, String> _invalidConfigs = new ConcurrentHashMap<>(); @@ -1319,18 +1319,18 @@ public class ControllerConf extends PinotConfiguration { } public int getMaxTableRebalanceZkJobs() { - return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK, DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); } public int getMaxTenantRebalanceZkJobs() { - return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK, DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); } public int getMaxReloadSegmentZkJobs() { - return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK, DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); } public int getMaxForceCommitZkJobs() { - return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index a07e952da6..e4e5a0d17d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -54,7 +54,7 @@ import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; import org.apache.pinot.core.auth.Actions; @@ -233,7 +233,7 @@ public class PinotRealtimeTableResource { throws Exception { Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId, - ControllerJobType.FORCE_COMMIT); + ControllerJobTypes.FORCE_COMMIT); if (controllerJobZKMetadata == null) { throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + forceCommitJobId, Response.Status.NOT_FOUND); @@ -259,7 +259,7 @@ public class PinotRealtimeTableResource { controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST, JsonUtils.objectToString(segmentsYetToBeCommitted)); _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, controllerJobZKMetadata, - ControllerJobType.FORCE_COMMIT); + ControllerJobTypes.FORCE_COMMIT); } Map<String, Object> result = new HashMap<>(controllerJobZKMetadata); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 560724fcc2..f00a1dcd7b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -86,7 +86,7 @@ import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.util.CompletionServiceHelper; import org.apache.pinot.controller.util.TableMetadataReader; import org.apache.pinot.controller.util.TableTierReader; @@ -533,7 +533,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId) throws Exception { Map<String, String> controllerJobZKMetadata = - _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, ControllerJobType.RELOAD_SEGMENT); + _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, ControllerJobTypes.RELOAD_SEGMENT); if (controllerJobZKMetadata == null) { throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId, Status.NOT_FOUND); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 38a781677f..71a27d325a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -95,7 +95,7 @@ import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.api.exception.TableAlreadyExistsException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; @@ -117,6 +117,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableStatsHumanReadable; import org.apache.pinot.spi.config.table.TableStatus; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.controller.ControllerJobType; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; @@ -767,7 +768,7 @@ public class PinotTableRestletResource { } public Map<String, String> getControllerJobMetadata(String jobId) { - return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TABLE_REBALANCE); + return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.TABLE_REBALANCE); } @DELETE @@ -1131,12 +1132,13 @@ public class PinotTableRestletResource { List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); - EnumSet<ControllerJobType> jobTypesToFilter = null; + Set<ControllerJobType> jobTypesToFilter = null; if (StringUtils.isNotEmpty(jobTypesString)) { + jobTypesToFilter = new HashSet<>(); for (String jobTypeStr : StringUtils.split(jobTypesString, ',')) { - ControllerJobType jobType; + ControllerJobTypes jobType; try { - jobType = ControllerJobType.valueOf(jobTypeStr.toUpperCase()); + jobType = ControllerJobTypes.valueOf(jobTypeStr.toUpperCase()); } catch (IllegalArgumentException e) { throw new ControllerApplicationException(LOGGER, "Unknown job type: " + jobTypeStr, Response.Status.BAD_REQUEST); @@ -1147,7 +1149,7 @@ public class PinotTableRestletResource { Map<String, Map<String, String>> result = new HashMap<>(); for (String tableNameWithType : tableNamesWithType) { result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null - ? EnumSet.allOf(ControllerJobType.class) : jobTypesToFilter, + ? new HashSet<>(EnumSet.allOf(ControllerJobTypes.class)) : jobTypesToFilter, jobMetadata -> jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE) .equals(tableNameWithType))); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index 45bee25da1..e2175e7577 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -61,7 +61,7 @@ import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats; @@ -707,7 +707,7 @@ public class PinotTenantRestletResource { @ApiParam(value = "Tenant rebalance job id", required = true) @PathParam("jobId") String jobId) throws JsonProcessingException { Map<String, String> controllerJobZKMetadata = - _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TENANT_REBALANCE); + _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.TENANT_REBALANCE); if (controllerJobZKMetadata == null) { throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + jobId, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ead9b9d712..d377ed190f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -37,7 +37,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -150,7 +149,7 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; @@ -174,6 +173,7 @@ import org.apache.pinot.spi.config.user.ComponentType; import org.apache.pinot.spi.config.user.RoleType; import org.apache.pinot.spi.config.user.UserConfig; import org.apache.pinot.spi.config.workload.QueryWorkloadConfig; +import org.apache.pinot.spi.controller.ControllerJobType; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.LogicalTableConfig; import org.apache.pinot.spi.data.Schema; @@ -2405,7 +2405,7 @@ public class PinotHelixResourceManager { * Returns a Map of jobId to job's ZK metadata that passes the checker, like for specific tables. * @return A Map of jobId to job properties */ - public Map<String, Map<String, String>> getAllJobs(EnumSet<ControllerJobType> jobTypes, + public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes, Predicate<Map<String, String>> jobMetadataChecker) { return ControllerZkHelixUtils.getAllControllerJobs(jobTypes, jobMetadataChecker, _propertyStore); } @@ -2425,14 +2425,14 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.RELOAD_SEGMENT.name()); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentNames); if (instanceName != null) { jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, instanceName); } - return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.RELOAD_SEGMENT); } /** @@ -2449,13 +2449,13 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.RELOAD_SEGMENT.name()); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); if (instanceName != null) { jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, instanceName); } - return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.RELOAD_SEGMENT); } public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, @@ -2464,11 +2464,11 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.FORCE_COMMIT.name()); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, JsonUtils.objectToString(consumingSegmentsCommitted)); - return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobTypes.FORCE_COMMIT); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java similarity index 83% rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java index c4cd936d56..18e67c69a1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java @@ -26,6 +26,8 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats; +import org.apache.pinot.spi.controller.ControllerJobType; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +36,18 @@ import org.slf4j.LoggerFactory; /** * Controller jobs that store metadata in the ZK property store. */ -public enum ControllerJobType { +public enum ControllerJobTypes implements ControllerJobType { RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE; - private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJobType.class); - private static final EnumMap<ControllerJobType, Integer> ZK_NUM_JOBS_LIMIT = new EnumMap<>(ControllerJobType.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJobTypes.class); + private static final EnumMap<ControllerJobTypes, Integer> ZK_NUM_JOBS_LIMIT = new EnumMap<>(ControllerJobTypes.class); - /** - * Gets the maximum number of job metadata entries that can be stored in ZK for this job type. - */ + @Override public Integer getZkNumJobsLimit() { - return ZK_NUM_JOBS_LIMIT.getOrDefault(this, ControllerConf.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); + return ZK_NUM_JOBS_LIMIT.getOrDefault(this, CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK); } public static void init(ControllerConf controllerConf) { @@ -57,13 +57,7 @@ public enum ControllerJobType { ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE, controllerConf.getMaxTenantRebalanceZkJobs()); } - /** - * Checks if the job metadata entry can be safely deleted. Note that the job metadata entry will only be attempted - * to be deleted when the number of entries in the job metadata map exceeds the configured limit for the job type. - * - * @param jobMetadataEntry The job metadata entry to check - a pair of job ID and job metadata map - * @return true if the job metadata entry can be safely deleted, false otherwise - */ + @Override public boolean canDelete(Pair<String, Map<String, String>> jobMetadataEntry) { switch (this) { case TABLE_REBALANCE: diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java index e7593bfdea..5c73cce4c5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java @@ -21,7 +21,6 @@ package org.apache.pinot.controller.helix.core.rebalance; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.CommonConstants; @@ -83,7 +82,7 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { private synchronized int retryRebalanceTables(Set<String> tableNamesWithType) { // Get all jobMetadata for all the given tables with a single ZK read. Map<String, Map<String, String>> allJobMetadataByJobId = - _pinotHelixResourceManager.getAllJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE), + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE), jobMetadata -> tableNamesWithType.contains( jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE))); Map<String, Map<String, Map<String, String>>> tableJobMetadataMap = new HashMap<>(); @@ -216,7 +215,8 @@ public class RebalanceChecker extends ControllerPeriodicTask<Void> { } private static void abortExistingJobs(String tableNameWithType, PinotHelixResourceManager pinotHelixResourceManager) { - boolean updated = pinotHelixResourceManager.updateJobsForTable(tableNameWithType, ControllerJobType.TABLE_REBALANCE, + boolean updated = + pinotHelixResourceManager.updateJobsForTable(tableNameWithType, ControllerJobTypes.TABLE_REBALANCE, jobMetadata -> { String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID); try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java index 0747ce7208..1536186dde 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,7 +37,7 @@ import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableConfig; @@ -210,7 +209,7 @@ public class TableRebalanceManager { */ public List<String> cancelRebalance(String tableNameWithType) { List<String> cancelledJobIds = new ArrayList<>(); - boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, ControllerJobType.TABLE_REBALANCE, + boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, ControllerJobTypes.TABLE_REBALANCE, jobMetadata -> { String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID); try { @@ -246,7 +245,7 @@ public class TableRebalanceManager { public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId) throws JsonProcessingException { Map<String, String> controllerJobZKMetadata = - _resourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TABLE_REBALANCE); + _resourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.TABLE_REBALANCE); if (controllerJobZKMetadata == null) { LOGGER.warn("Rebalance job with ID: {} not found", jobId); throw new NotFoundException("Rebalance job with ID: " + jobId + " not found"); @@ -292,7 +291,7 @@ public class TableRebalanceManager { public static String rebalanceJobInProgress(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore) { // Get all jobMetadata for the given table with a single ZK read. Map<String, Map<String, String>> allJobMetadataByJobId = - ControllerZkHelixUtils.getAllControllerJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE), + ControllerZkHelixUtils.getAllControllerJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE), jobMetadata -> tableNameWithType.equals( jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)), propertyStore); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index bd30e98168..32fedd509f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -29,7 +29,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -282,7 +282,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { Map<String, String> jobMetadata = createJobMetadata(_tableNameWithType, _rebalanceJobId, _tableRebalanceProgressStats, _tableRebalanceContext); ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, _rebalanceJobId, jobMetadata, - ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> { + ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> { // In addition to updating job progress status, the observer also checks if the job status is IN_PROGRESS. // If not, then no need to update the job status, and we keep this status to end the job promptly. if (prevJobMetadata == null) { @@ -318,7 +318,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name()); try { jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(tableRebalanceProgressStats)); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java index 6513e47a2f..0d89a94d54 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -100,14 +100,14 @@ public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver { jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TENANT_REBALANCE.name()); try { jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(_progressStats)); } catch (JsonProcessingException e) { LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", _jobId, e); } - _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobType.TENANT_REBALANCE); + _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, ControllerJobTypes.TENANT_REBALANCE); _numUpdatesToZk++; LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {} ", _numUpdatesToZk, _jobId); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java index de1e8d6aa9..04b661af1e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java @@ -21,11 +21,11 @@ package org.apache.pinot.controller.helix.core.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Comparator; -import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; @@ -33,7 +33,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.spi.controller.ControllerJobType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -90,7 +90,7 @@ public class ControllerZkHelixUtils { * @param propertyStore the ZK property store to read from * @return a map of jobId to job metadata for all the jobs that match the given job types and metadata checker */ - public static Map<String, Map<String, String>> getAllControllerJobs(EnumSet<ControllerJobType> jobTypes, + public static Map<String, Map<String, String>> getAllControllerJobs(Set<ControllerJobType> jobTypes, Predicate<Map<String, String>> jobMetadataChecker, ZkHelixPropertyStore<ZNRecord> propertyStore) { Map<String, Map<String, String>> controllerJobs = new HashMap<>(); for (ControllerJobType jobType : jobTypes) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java index b424fc7193..fbf3b2d6b9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java @@ -40,7 +40,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -370,7 +370,7 @@ public class RebalanceCheckerTest { HelixManager helixZkManager = mock(HelixManager.class); ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); String zkPath = - ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE.name()); + ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobTypes.TABLE_REBALANCE.name()); ZNRecord jobsZnRecord = new ZNRecord("jobs"); when(propertyStore.get(eq(zkPath), any(), eq(AccessOption.PERSISTENT))).thenReturn(jobsZnRecord); when(helixZkManager.getClusterManagmentTool()).thenReturn(mock(HelixAdmin.class)); @@ -380,16 +380,16 @@ public class RebalanceCheckerTest { pinotHelixManager.addControllerJobToZK("job1", ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000", "tableName", "table01"), - ControllerJobType.TABLE_REBALANCE, jmd -> true); + ControllerJobTypes.TABLE_REBALANCE, jmd -> true); pinotHelixManager.addControllerJobToZK("job2", ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000", "tableName", "table01"), - ControllerJobType.TABLE_REBALANCE, jmd -> false); + ControllerJobTypes.TABLE_REBALANCE, jmd -> false); pinotHelixManager.addControllerJobToZK("job3", ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000", "tableName", "table02"), - ControllerJobType.TABLE_REBALANCE, jmd -> true); + ControllerJobTypes.TABLE_REBALANCE, jmd -> true); pinotHelixManager.addControllerJobToZK("job4", ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000", "tableName", "table02"), - ControllerJobType.TABLE_REBALANCE, jmd -> true); + ControllerJobTypes.TABLE_REBALANCE, jmd -> true); Map<String, Map<String, String>> jmds = jobsZnRecord.getMapFields(); assertEquals(jmds.size(), 3); assertTrue(jmds.containsKey("job1")); @@ -397,13 +397,13 @@ public class RebalanceCheckerTest { assertTrue(jmds.containsKey("job4")); Set<String> expectedJobs01 = new HashSet<>(); - pinotHelixManager.updateJobsForTable("table01", ControllerJobType.TABLE_REBALANCE, + pinotHelixManager.updateJobsForTable("table01", ControllerJobTypes.TABLE_REBALANCE, jmd -> expectedJobs01.add(jmd.get("jobId"))); assertEquals(expectedJobs01.size(), 1); assertTrue(expectedJobs01.contains("job1")); Set<String> expectedJobs02 = new HashSet<>(); - pinotHelixManager.updateJobsForTable("table02", ControllerJobType.TABLE_REBALANCE, + pinotHelixManager.updateJobsForTable("table02", ControllerJobTypes.TABLE_REBALANCE, jmd -> expectedJobs02.add(jmd.get("jobId"))); assertEquals(expectedJobs02.size(), 2); assertTrue(expectedJobs02.contains("job3")); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java index db23d24121..9a2db888a1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java @@ -28,7 +28,7 @@ import java.util.concurrent.Executors; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.helix.ControllerTest; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; @@ -149,7 +149,7 @@ public class TenantRebalancerTest extends ControllerTest { private TenantRebalanceProgressStats getProgress(String jobId) throws JsonProcessingException { Map<String, String> controllerJobZKMetadata = - _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.TENANT_REBALANCE); + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.TENANT_REBALANCE); if (controllerJobZKMetadata == null) { return null; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java index 9aee5e9e11..70ea1ea5f8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java @@ -21,7 +21,7 @@ package org.apache.pinot.controller.helix.core.util; import java.util.Map; import java.util.Set; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; @@ -42,7 +42,7 @@ public class ControllerZkHelixUtilsTest { // Setup job limits ControllerConf controllerConf = mock(ControllerConf.class); when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2); - ControllerJobType.init(controllerConf); + ControllerJobTypes.init(controllerConf); TableRebalanceProgressStats inProgressStats = new TableRebalanceProgressStats(); inProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS); @@ -53,24 +53,24 @@ public class ControllerZkHelixUtilsTest { Map<String, Map<String, String>> jobMetadataMap = Map.of( "job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "1000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(inProgressStats)), "job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(completedStats)), "job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "2000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(abortedStats)), "job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "4000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(inProgressStats)), "job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "5000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(inProgressStats)) ); Map<String, Map<String, String>> updatedJobMetadataMap = - ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, ControllerJobType.TABLE_REBALANCE); + ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, ControllerJobTypes.TABLE_REBALANCE); // Even though the limit is 2, we should not delete the in-progress jobs assertEquals(updatedJobMetadataMap.size(), 3); assertEquals(updatedJobMetadataMap.keySet(), Set.of("job1", "job4", "job5")); @@ -82,7 +82,7 @@ public class ControllerZkHelixUtilsTest { // Setup job limits ControllerConf controllerConf = mock(ControllerConf.class); when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2); - ControllerJobType.init(controllerConf); + ControllerJobTypes.init(controllerConf); TableRebalanceProgressStats completedStats = new TableRebalanceProgressStats(); completedStats.setStatus(RebalanceResult.Status.DONE); @@ -91,24 +91,24 @@ public class ControllerZkHelixUtilsTest { Map<String, Map<String, String>> jobMetadataMap = Map.of( "job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "1000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(completedStats)), "job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "5000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(completedStats)), "job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "3000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(abortedStats)), "job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "2000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(completedStats)), "job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "4000", - CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name(), + CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name(), RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(abortedStats)) ); Map<String, Map<String, String>> updatedJobMetadataMap = - ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, ControllerJobType.TABLE_REBALANCE); + ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, ControllerJobTypes.TABLE_REBALANCE); assertEquals(updatedJobMetadataMap.size(), 2); // Retain the two most recent jobs based on submission time assertEquals(updatedJobMetadataMap.keySet(), Set.of("job2", "job5")); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index eca80e5017..1d786400ea 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -44,7 +44,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory; import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer; @@ -457,7 +457,7 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr private void testForceCommitInternal(String realtimeTableName, String jobId, Set<String> consumingSegments, long timeoutMs) { Map<String, String> jobMetadata = - _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobType.FORCE_COMMIT); + _helixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.FORCE_COMMIT); assertNotNull(jobMetadata); assertNotNull(jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java index 85dc9fd79b..a41d25ab41 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java @@ -38,7 +38,7 @@ import org.apache.pinot.common.utils.regex.Matcher; import org.apache.pinot.common.utils.regex.Pattern; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse; -import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; @@ -1291,10 +1291,10 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name()); jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(progressStats)); - ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, + ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true); // Add a new server (to force change in instance assignment) and enable reassignInstances to ensure that the @@ -1313,7 +1313,7 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT progressStats.setStatus(RebalanceResult.Status.DONE); jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(progressStats)); - ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobType.TABLE_REBALANCE, + ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, jobMetadata, ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true); // Stop the added server @@ -1335,15 +1335,16 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT String inProgressJobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, inProgressJobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "1000"); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobTypes.TABLE_REBALANCE.name()); TableRebalanceProgressStats progressStats = new TableRebalanceProgressStats(); progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS); jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(progressStats)); ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, inProgressJobId, jobMetadata, - ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true); + ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true); - assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId, ControllerJobType.TABLE_REBALANCE)); + assertNotNull( + _helixResourceManager.getControllerJobZKMetadata(inProgressJobId, ControllerJobTypes.TABLE_REBALANCE)); // Add a DONE rebalance String doneJobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); @@ -1354,9 +1355,9 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT JsonUtils.objectToString(progressStats)); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, String.valueOf(System.currentTimeMillis())); ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, doneJobId, jobMetadata, - ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true); + ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true); - assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, ControllerJobType.TABLE_REBALANCE)); + assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, ControllerJobTypes.TABLE_REBALANCE)); // Add another DONE rebalance jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, "anotherTable_REALTIME"); @@ -1365,16 +1366,17 @@ public class TableRebalanceIntegrationTest extends BaseHybridClusterIntegrationT String.valueOf(System.currentTimeMillis() + 1000)); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, anotherDoneJobId); ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, anotherDoneJobId, jobMetadata, - ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true); + ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true); assertNotNull( - _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId, ControllerJobType.TABLE_REBALANCE)); + _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId, ControllerJobTypes.TABLE_REBALANCE)); // Verify that the first DONE job is cleaned up - assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, ControllerJobType.TABLE_REBALANCE)); + assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, ControllerJobTypes.TABLE_REBALANCE)); // Verify that the in-progress job is still there even though it has the oldest submission time - assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId, ControllerJobType.TABLE_REBALANCE)); + assertNotNull( + _helixResourceManager.getControllerJobZKMetadata(inProgressJobId, ControllerJobTypes.TABLE_REBALANCE)); } private String getReloadJobIdFromResponse(String response) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java new file mode 100644 index 0000000000..822c7d9c3e --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.controller; + +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * Interface for controller job types that store metadata in the ZK property store. + */ +public interface ControllerJobType { + + /** + * Name of the controller job type, which is used in the ZK property store path for storing job metadata for jobs + * of this type. + */ + String name(); + + /** + * Gets the maximum number of job metadata entries that can be stored in ZK for this job type. + */ + default Integer getZkNumJobsLimit() { + return CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK; + } + + /** + * Checks if the job metadata entry can be safely deleted. Note that the job metadata entry will only be attempted + * to be deleted when the number of entries in the job metadata map exceeds the configured limit for the job type. + * + * @param jobMetadataEntry The job metadata entry to check - a pair of job ID and job metadata map + * @return true if the job metadata entry can be safely deleted, false otherwise + */ + default boolean canDelete(Pair<String, Map<String, String>> jobMetadataEntry) { + return true; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 14a81ba0a1..611f301f67 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1307,6 +1307,8 @@ public class CommonConstants { public static final String SUBMISSION_TIME_MS = "submissionTimeMs"; public static final String MESSAGE_COUNT = "messageCount"; + public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100; + /** * Segment reload job ZK props */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org