This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f81b03c204 Refactor ControllerJobType enum into extensible interface 
(#16106)
f81b03c204 is described below

commit f81b03c20493c9dd8ed818c1706262eb2dd64797
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Mon Jun 16 04:31:35 2025 +0100

    Refactor ControllerJobType enum into extensible interface (#16106)
---
 .../pinot/controller/BaseControllerStarter.java    |  4 +-
 .../apache/pinot/controller/ControllerConf.java    | 10 ++--
 .../api/resources/PinotRealtimeTableResource.java  |  6 +--
 .../api/resources/PinotSegmentRestletResource.java |  4 +-
 .../api/resources/PinotTableRestletResource.java   | 14 +++---
 .../api/resources/PinotTenantRestletResource.java  |  4 +-
 .../helix/core/PinotHelixResourceManager.java      | 18 ++++----
 ...trollerJobType.java => ControllerJobTypes.java} | 22 ++++-----
 .../helix/core/rebalance/RebalanceChecker.java     |  8 ++--
 .../core/rebalance/TableRebalanceManager.java      |  9 ++--
 .../rebalance/ZkBasedTableRebalanceObserver.java   |  6 +--
 .../tenant/ZkBasedTenantRebalanceObserver.java     |  6 +--
 .../helix/core/util/ControllerZkHelixUtils.java    |  6 +--
 .../helix/core/rebalance/RebalanceCheckerTest.java | 16 +++----
 .../rebalance/tenant/TenantRebalancerTest.java     |  4 +-
 .../core/util/ControllerZkHelixUtilsTest.java      | 30 ++++++------
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  4 +-
 .../tests/TableRebalanceIntegrationTest.java       | 28 +++++------
 .../pinot/spi/controller/ControllerJobType.java    | 54 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  2 +
 20 files changed, 154 insertions(+), 101 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 346fcc7b24..8ab5c6ac27 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -94,7 +94,7 @@ import 
org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter;
@@ -281,7 +281,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     
TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled());
 
     ContinuousJfrStarter.init(_config);
-    ControllerJobType.init(_config);
+    ControllerJobTypes.init(_config);
   }
 
   /// Returns the default cluster configs to be stored in ZK as Helix cluster 
config. These configs will then be
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 03f4c9bca7..4b64b2abf0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.utils.TimeUtils;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_INSTANCE_ID;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX;
+import static org.apache.pinot.spi.utils.CommonConstants.ControllerJob;
 
 
 public class ControllerConf extends PinotConfiguration {
@@ -383,7 +384,6 @@ public class ControllerConf extends PinotConfiguration {
   public static final String CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK = 
"controller.tenant.rebalance.maxJobsInZK";
   public static final String CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK = 
"controller.reload.segment.maxJobsInZK";
   public static final String CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK = 
"controller.force.commit.maxJobsInZK";
-  public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100;
 
   private final Map<String, String> _invalidConfigs = new 
ConcurrentHashMap<>();
 
@@ -1319,18 +1319,18 @@ public class ControllerConf extends PinotConfiguration {
   }
 
   public int getMaxTableRebalanceZkJobs() {
-    return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK, 
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+    return getProperty(CONFIG_OF_MAX_TABLE_REBALANCE_JOBS_IN_ZK, 
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
   }
 
   public int getMaxTenantRebalanceZkJobs() {
-    return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK, 
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+    return getProperty(CONFIG_OF_MAX_TENANT_REBALANCE_JOBS_IN_ZK, 
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
   }
 
   public int getMaxReloadSegmentZkJobs() {
-    return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK, 
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+    return getProperty(CONFIG_OF_MAX_RELOAD_SEGMENT_JOBS_IN_ZK, 
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
   }
 
   public int getMaxForceCommitZkJobs() {
-    return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, 
DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+    return getProperty(CONFIG_OF_MAX_FORCE_COMMIT_JOBS_IN_ZK, 
ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index a07e952da6..e4e5a0d17d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -54,7 +54,7 @@ import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.core.auth.Actions;
@@ -233,7 +233,7 @@ public class PinotRealtimeTableResource {
       throws Exception {
     Map<String, String> controllerJobZKMetadata =
         _pinotHelixResourceManager.getControllerJobZKMetadata(forceCommitJobId,
-            ControllerJobType.FORCE_COMMIT);
+            ControllerJobTypes.FORCE_COMMIT);
     if (controllerJobZKMetadata == null) {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + forceCommitJobId,
           Response.Status.NOT_FOUND);
@@ -259,7 +259,7 @@ public class PinotRealtimeTableResource {
       
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
           JsonUtils.objectToString(segmentsYetToBeCommitted));
       _pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId, 
controllerJobZKMetadata,
-          ControllerJobType.FORCE_COMMIT);
+          ControllerJobTypes.FORCE_COMMIT);
     }
 
     Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 560724fcc2..f00a1dcd7b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -86,7 +86,7 @@ import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.controller.util.TableMetadataReader;
 import org.apache.pinot.controller.util.TableTierReader;
@@ -533,7 +533,7 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") 
String reloadJobId)
       throws Exception {
     Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobType.RELOAD_SEGMENT);
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
     if (controllerJobZKMetadata == null) {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + reloadJobId,
           Status.NOT_FOUND);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 38a781677f..71a27d325a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -95,7 +95,7 @@ import 
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
@@ -117,6 +117,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
 import org.apache.pinot.spi.config.table.TableStatus;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -767,7 +768,7 @@ public class PinotTableRestletResource {
   }
 
   public Map<String, String> getControllerJobMetadata(String jobId) {
-    return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TABLE_REBALANCE);
+    return _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TABLE_REBALANCE);
   }
 
   @DELETE
@@ -1131,12 +1132,13 @@ public class PinotTableRestletResource {
     List<String> tableNamesWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableTypeFromRequest,
             LOGGER);
-    EnumSet<ControllerJobType> jobTypesToFilter = null;
+    Set<ControllerJobType> jobTypesToFilter = null;
     if (StringUtils.isNotEmpty(jobTypesString)) {
+      jobTypesToFilter = new HashSet<>();
       for (String jobTypeStr : StringUtils.split(jobTypesString, ',')) {
-        ControllerJobType jobType;
+        ControllerJobTypes jobType;
         try {
-          jobType = ControllerJobType.valueOf(jobTypeStr.toUpperCase());
+          jobType = ControllerJobTypes.valueOf(jobTypeStr.toUpperCase());
         } catch (IllegalArgumentException e) {
           throw new ControllerApplicationException(LOGGER, "Unknown job type: 
" + jobTypeStr,
               Response.Status.BAD_REQUEST);
@@ -1147,7 +1149,7 @@ public class PinotTableRestletResource {
     Map<String, Map<String, String>> result = new HashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
       result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == 
null
-              ? EnumSet.allOf(ControllerJobType.class) : jobTypesToFilter,
+              ? new HashSet<>(EnumSet.allOf(ControllerJobTypes.class)) : 
jobTypesToFilter,
           jobMetadata -> 
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
               .equals(tableNameWithType)));
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 45bee25da1..e2175e7577 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -61,7 +61,7 @@ import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
@@ -707,7 +707,7 @@ public class PinotTenantRestletResource {
       @ApiParam(value = "Tenant rebalance job id", required = true) 
@PathParam("jobId") String jobId)
       throws JsonProcessingException {
     Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TENANT_REBALANCE);
+        _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
 
     if (controllerJobZKMetadata == null) {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ead9b9d712..d377ed190f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -37,7 +37,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -150,7 +149,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -174,6 +173,7 @@ import org.apache.pinot.spi.config.user.ComponentType;
 import org.apache.pinot.spi.config.user.RoleType;
 import org.apache.pinot.spi.config.user.UserConfig;
 import org.apache.pinot.spi.config.workload.QueryWorkloadConfig;
+import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -2405,7 +2405,7 @@ public class PinotHelixResourceManager {
    * Returns a Map of jobId to job's ZK metadata that passes the checker, like 
for specific tables.
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> 
getAllJobs(EnumSet<ControllerJobType> jobTypes,
+  public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> 
jobTypes,
       Predicate<Map<String, String>> jobMetadataChecker) {
     return ControllerZkHelixUtils.getAllControllerJobs(jobTypes, 
jobMetadataChecker, _propertyStore);
   }
@@ -2425,14 +2425,14 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.RELOAD_SEGMENT.name());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
     
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentNames);
     if (instanceName != null) {
       
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, 
instanceName);
     }
-    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobTypes.RELOAD_SEGMENT);
   }
 
   /**
@@ -2449,13 +2449,13 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.RELOAD_SEGMENT.name());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
     if (instanceName != null) {
       
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, 
instanceName);
     }
-    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobTypes.RELOAD_SEGMENT);
   }
 
   public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
long jobSubmissionTimeMs,
@@ -2464,11 +2464,11 @@ public class PinotHelixResourceManager {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.FORCE_COMMIT.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.FORCE_COMMIT.name());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
-    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.FORCE_COMMIT);
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobTypes.FORCE_COMMIT);
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
similarity index 83%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
index c4cd936d56..18e67c69a1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobType.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/controllerjob/ControllerJobTypes.java
@@ -26,6 +26,8 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
+import org.apache.pinot.spi.controller.ControllerJobType;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,20 +36,18 @@ import org.slf4j.LoggerFactory;
 /**
  * Controller jobs that store metadata in the ZK property store.
  */
-public enum ControllerJobType {
+public enum ControllerJobTypes implements ControllerJobType {
   RELOAD_SEGMENT,
   FORCE_COMMIT,
   TABLE_REBALANCE,
   TENANT_REBALANCE;
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerJobType.class);
-  private static final EnumMap<ControllerJobType, Integer> ZK_NUM_JOBS_LIMIT = 
new EnumMap<>(ControllerJobType.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerJobTypes.class);
+  private static final EnumMap<ControllerJobTypes, Integer> ZK_NUM_JOBS_LIMIT 
= new EnumMap<>(ControllerJobTypes.class);
 
-  /**
-   * Gets the maximum number of job metadata entries that can be stored in ZK 
for this job type.
-   */
+  @Override
   public Integer getZkNumJobsLimit() {
-    return ZK_NUM_JOBS_LIMIT.getOrDefault(this, 
ControllerConf.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
+    return ZK_NUM_JOBS_LIMIT.getOrDefault(this, 
CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK);
   }
 
   public static void init(ControllerConf controllerConf) {
@@ -57,13 +57,7 @@ public enum ControllerJobType {
     ZK_NUM_JOBS_LIMIT.put(TENANT_REBALANCE, 
controllerConf.getMaxTenantRebalanceZkJobs());
   }
 
-  /**
-   * Checks if the job metadata entry can be safely deleted. Note that the job 
metadata entry will only be attempted
-   * to be deleted when the number of entries in the job metadata map exceeds 
the configured limit for the job type.
-   *
-   * @param jobMetadataEntry The job metadata entry to check - a pair of job 
ID and job metadata map
-   * @return true if the job metadata entry can be safely deleted, false 
otherwise
-   */
+  @Override
   public boolean canDelete(Pair<String, Map<String, String>> jobMetadataEntry) 
{
     switch (this) {
       case TABLE_REBALANCE:
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index e7593bfdea..5c73cce4c5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -83,7 +82,7 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
   private synchronized int retryRebalanceTables(Set<String> 
tableNamesWithType) {
     // Get all jobMetadata for all the given tables with a single ZK read.
     Map<String, Map<String, String>> allJobMetadataByJobId =
-        
_pinotHelixResourceManager.getAllJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE),
+        
_pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE),
             jobMetadata -> tableNamesWithType.contains(
                 
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)));
     Map<String, Map<String, Map<String, String>>> tableJobMetadataMap = new 
HashMap<>();
@@ -216,7 +215,8 @@ public class RebalanceChecker extends 
ControllerPeriodicTask<Void> {
   }
 
   private static void abortExistingJobs(String tableNameWithType, 
PinotHelixResourceManager pinotHelixResourceManager) {
-    boolean updated = 
pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+    boolean updated =
+        pinotHelixResourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobTypes.TABLE_REBALANCE,
         jobMetadata -> {
           String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
           try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
index 0747ce7208..1536186dde 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,7 +37,7 @@ import 
org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -210,7 +209,7 @@ public class TableRebalanceManager {
    */
   public List<String> cancelRebalance(String tableNameWithType) {
     List<String> cancelledJobIds = new ArrayList<>();
-    boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobType.TABLE_REBALANCE,
+    boolean updated = _resourceManager.updateJobsForTable(tableNameWithType, 
ControllerJobTypes.TABLE_REBALANCE,
         jobMetadata -> {
           String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
           try {
@@ -246,7 +245,7 @@ public class TableRebalanceManager {
   public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId)
       throws JsonProcessingException {
     Map<String, String> controllerJobZKMetadata =
-        _resourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TABLE_REBALANCE);
+        _resourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TABLE_REBALANCE);
     if (controllerJobZKMetadata == null) {
       LOGGER.warn("Rebalance job with ID: {} not found", jobId);
       throw new NotFoundException("Rebalance job with ID: " + jobId + " not 
found");
@@ -292,7 +291,7 @@ public class TableRebalanceManager {
   public static String rebalanceJobInProgress(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
     // Get all jobMetadata for the given table with a single ZK read.
     Map<String, Map<String, String>> allJobMetadataByJobId =
-        
ControllerZkHelixUtils.getAllControllerJobs(EnumSet.of(ControllerJobType.TABLE_REBALANCE),
+        
ControllerZkHelixUtils.getAllControllerJobs(Set.of(ControllerJobTypes.TABLE_REBALANCE),
             jobMetadata -> tableNameWithType.equals(
                 
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)), 
propertyStore);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index bd30e98168..32fedd509f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -29,7 +29,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -282,7 +282,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     Map<String, String> jobMetadata =
         createJobMetadata(_tableNameWithType, _rebalanceJobId, 
_tableRebalanceProgressStats, _tableRebalanceContext);
     ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, 
_rebalanceJobId, jobMetadata,
-        ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> {
+        ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> {
           // In addition to updating job progress status, the observer also 
checks if the job status is IN_PROGRESS.
           // If not, then no need to update the job status, and we keep this 
status to end the job promptly.
           if (prevJobMetadata == null) {
@@ -318,7 +318,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(tableRebalanceProgressStats));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 6513e47a2f..0d89a94d54 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -100,14 +100,14 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TENANT_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TENANT_REBALANCE.name());
     try {
       
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(_progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
     }
-    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobType.TENANT_REBALANCE);
+    _pinotHelixResourceManager.addControllerJobToZK(_jobId, jobMetadata, 
ControllerJobTypes.TENANT_REBALANCE);
     _numUpdatesToZk++;
     LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", 
_numUpdatesToZk, _jobId);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
index de1e8d6aa9..04b661af1e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
@@ -21,11 +21,11 @@ package org.apache.pinot.controller.helix.core.util;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
@@ -33,7 +33,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -90,7 +90,7 @@ public class ControllerZkHelixUtils {
    * @param propertyStore the ZK property store to read from
    * @return a map of jobId to job metadata for all the jobs that match the 
given job types and metadata checker
    */
-  public static Map<String, Map<String, String>> 
getAllControllerJobs(EnumSet<ControllerJobType> jobTypes,
+  public static Map<String, Map<String, String>> 
getAllControllerJobs(Set<ControllerJobType> jobTypes,
       Predicate<Map<String, String>> jobMetadataChecker, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
     Map<String, Map<String, String>> controllerJobs = new HashMap<>();
     for (ControllerJobType jobType : jobTypes) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index b424fc7193..fbf3b2d6b9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -40,7 +40,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -370,7 +370,7 @@ public class RebalanceCheckerTest {
     HelixManager helixZkManager = mock(HelixManager.class);
     ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
     String zkPath =
-        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE.name());
+        
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobTypes.TABLE_REBALANCE.name());
     ZNRecord jobsZnRecord = new ZNRecord("jobs");
     when(propertyStore.get(eq(zkPath), any(), 
eq(AccessOption.PERSISTENT))).thenReturn(jobsZnRecord);
     
when(helixZkManager.getClusterManagmentTool()).thenReturn(mock(HelixAdmin.class));
@@ -380,16 +380,16 @@ public class RebalanceCheckerTest {
 
     pinotHelixManager.addControllerJobToZK("job1",
         ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000", 
"tableName", "table01"),
-        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+        ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
     pinotHelixManager.addControllerJobToZK("job2",
         ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000", 
"tableName", "table01"),
-        ControllerJobType.TABLE_REBALANCE, jmd -> false);
+        ControllerJobTypes.TABLE_REBALANCE, jmd -> false);
     pinotHelixManager.addControllerJobToZK("job3",
         ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000", 
"tableName", "table02"),
-        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+        ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
     pinotHelixManager.addControllerJobToZK("job4",
         ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000", 
"tableName", "table02"),
-        ControllerJobType.TABLE_REBALANCE, jmd -> true);
+        ControllerJobTypes.TABLE_REBALANCE, jmd -> true);
     Map<String, Map<String, String>> jmds = jobsZnRecord.getMapFields();
     assertEquals(jmds.size(), 3);
     assertTrue(jmds.containsKey("job1"));
@@ -397,13 +397,13 @@ public class RebalanceCheckerTest {
     assertTrue(jmds.containsKey("job4"));
 
     Set<String> expectedJobs01 = new HashSet<>();
-    pinotHelixManager.updateJobsForTable("table01", 
ControllerJobType.TABLE_REBALANCE,
+    pinotHelixManager.updateJobsForTable("table01", 
ControllerJobTypes.TABLE_REBALANCE,
         jmd -> expectedJobs01.add(jmd.get("jobId")));
     assertEquals(expectedJobs01.size(), 1);
     assertTrue(expectedJobs01.contains("job1"));
 
     Set<String> expectedJobs02 = new HashSet<>();
-    pinotHelixManager.updateJobsForTable("table02", 
ControllerJobType.TABLE_REBALANCE,
+    pinotHelixManager.updateJobsForTable("table02", 
ControllerJobTypes.TABLE_REBALANCE,
         jmd -> expectedJobs02.add(jmd.get("jobId")));
     assertEquals(expectedJobs02.size(), 2);
     assertTrue(expectedJobs02.contains("job3"));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index db23d24121..9a2db888a1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
@@ -149,7 +149,7 @@ public class TenantRebalancerTest extends ControllerTest {
   private TenantRebalanceProgressStats getProgress(String jobId)
       throws JsonProcessingException {
     Map<String, String> controllerJobZKMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.TENANT_REBALANCE);
+        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.TENANT_REBALANCE);
     if (controllerJobZKMetadata == null) {
       return null;
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
index 9aee5e9e11..70ea1ea5f8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtilsTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.controller.helix.core.util;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
@@ -42,7 +42,7 @@ public class ControllerZkHelixUtilsTest {
     // Setup job limits
     ControllerConf controllerConf = mock(ControllerConf.class);
     when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2);
-    ControllerJobType.init(controllerConf);
+    ControllerJobTypes.init(controllerConf);
 
     TableRebalanceProgressStats inProgressStats = new 
TableRebalanceProgressStats();
     inProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
@@ -53,24 +53,24 @@ public class ControllerZkHelixUtilsTest {
 
     Map<String, Map<String, String>> jobMetadataMap = Map.of(
         "job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"1000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(inProgressStats)),
         "job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"3000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(completedStats)),
         "job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"2000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(abortedStats)),
         "job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"4000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(inProgressStats)),
         "job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"5000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(inProgressStats))
     );
 
     Map<String, Map<String, String>> updatedJobMetadataMap =
-        ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, 
ControllerJobType.TABLE_REBALANCE);
+        ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, 
ControllerJobTypes.TABLE_REBALANCE);
     // Even though the limit is 2, we should not delete the in-progress jobs
     assertEquals(updatedJobMetadataMap.size(), 3);
     assertEquals(updatedJobMetadataMap.keySet(), Set.of("job1", "job4", 
"job5"));
@@ -82,7 +82,7 @@ public class ControllerZkHelixUtilsTest {
     // Setup job limits
     ControllerConf controllerConf = mock(ControllerConf.class);
     when(controllerConf.getMaxTableRebalanceZkJobs()).thenReturn(2);
-    ControllerJobType.init(controllerConf);
+    ControllerJobTypes.init(controllerConf);
 
     TableRebalanceProgressStats completedStats = new 
TableRebalanceProgressStats();
     completedStats.setStatus(RebalanceResult.Status.DONE);
@@ -91,24 +91,24 @@ public class ControllerZkHelixUtilsTest {
 
     Map<String, Map<String, String>> jobMetadataMap = Map.of(
         "job1", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"1000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(completedStats)),
         "job2", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"5000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(completedStats)),
         "job3", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"3000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(abortedStats)),
         "job4", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"2000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(completedStats)),
         "job5", Map.of(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
"4000",
-            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name(),
+            CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name(),
             RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, 
JsonUtils.objectToString(abortedStats))
     );
 
     Map<String, Map<String, String>> updatedJobMetadataMap =
-        ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, 
ControllerJobType.TABLE_REBALANCE);
+        ControllerZkHelixUtils.expireControllerJobsInZk(jobMetadataMap, 
ControllerJobTypes.TABLE_REBALANCE);
     assertEquals(updatedJobMetadataMap.size(), 2);
     // Retain the two most recent jobs based on submission time
     assertEquals(updatedJobMetadataMap.keySet(), Set.of("job2", "job5"));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index eca80e5017..1d786400ea 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -44,7 +44,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
 import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
 import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
@@ -457,7 +457,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegr
   private void testForceCommitInternal(String realtimeTableName, String jobId, 
Set<String> consumingSegments,
       long timeoutMs) {
     Map<String, String> jobMetadata =
-        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobType.FORCE_COMMIT);
+        _helixResourceManager.getControllerJobZKMetadata(jobId, 
ControllerJobTypes.FORCE_COMMIT);
     assertNotNull(jobMetadata);
     
assertNotNull(jobMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST));
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 85dc9fd79b..a41d25ab41 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.utils.regex.Matcher;
 import org.apache.pinot.common.utils.regex.Pattern;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
@@ -1291,10 +1291,10 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name());
     
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
         JsonUtils.objectToString(progressStats));
-    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobTypes.TABLE_REBALANCE,
         prevJobMetadata -> true);
 
     // Add a new server (to force change in instance assignment) and enable 
reassignInstances to ensure that the
@@ -1313,7 +1313,7 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
     progressStats.setStatus(RebalanceResult.Status.DONE);
     
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
         JsonUtils.objectToString(progressStats));
-    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+    ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId, 
jobMetadata, ControllerJobTypes.TABLE_REBALANCE,
         prevJobMetadata -> true);
 
     // Stop the added server
@@ -1335,15 +1335,16 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
     String inProgressJobId = 
TableRebalancer.createUniqueRebalanceJobIdentifier();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, inProgressJobId);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, "1000");
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobTypes.TABLE_REBALANCE.name());
     TableRebalanceProgressStats progressStats = new 
TableRebalanceProgressStats();
     progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
     
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
         JsonUtils.objectToString(progressStats));
     ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, 
inProgressJobId, jobMetadata,
-        ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+        ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
 
-    
assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId, 
ControllerJobType.TABLE_REBALANCE));
+    assertNotNull(
+        _helixResourceManager.getControllerJobZKMetadata(inProgressJobId, 
ControllerJobTypes.TABLE_REBALANCE));
 
     // Add a DONE rebalance
     String doneJobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
@@ -1354,9 +1355,9 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
         JsonUtils.objectToString(progressStats));
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
String.valueOf(System.currentTimeMillis()));
     ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, doneJobId, 
jobMetadata,
-        ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+        ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
 
-    assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, 
ControllerJobType.TABLE_REBALANCE));
+    assertNotNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, 
ControllerJobTypes.TABLE_REBALANCE));
 
     // Add another DONE rebalance
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
"anotherTable_REALTIME");
@@ -1365,16 +1366,17 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
         String.valueOf(System.currentTimeMillis() + 1000));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, anotherDoneJobId);
     ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, 
anotherDoneJobId, jobMetadata,
-        ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> true);
+        ControllerJobTypes.TABLE_REBALANCE, prevJobMetadata -> true);
 
     assertNotNull(
-        _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId, 
ControllerJobType.TABLE_REBALANCE));
+        _helixResourceManager.getControllerJobZKMetadata(anotherDoneJobId, 
ControllerJobTypes.TABLE_REBALANCE));
 
     // Verify that the first DONE job is cleaned up
-    assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, 
ControllerJobType.TABLE_REBALANCE));
+    assertNull(_helixResourceManager.getControllerJobZKMetadata(doneJobId, 
ControllerJobTypes.TABLE_REBALANCE));
 
     // Verify that the in-progress job is still there even though it has the 
oldest submission time
-    
assertNotNull(_helixResourceManager.getControllerJobZKMetadata(inProgressJobId, 
ControllerJobType.TABLE_REBALANCE));
+    assertNotNull(
+        _helixResourceManager.getControllerJobZKMetadata(inProgressJobId, 
ControllerJobTypes.TABLE_REBALANCE));
   }
 
   private String getReloadJobIdFromResponse(String response) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
new file mode 100644
index 0000000000..822c7d9c3e
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/controller/ControllerJobType.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.controller;
+
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Interface for controller job types that store metadata in the ZK property 
store.
+ */
+public interface ControllerJobType {
+
+  /**
+   * Name of the controller job type, which is used in the ZK property store 
path for storing job metadata for jobs
+   * of this type.
+   */
+  String name();
+
+  /**
+   * Gets the maximum number of job metadata entries that can be stored in ZK 
for this job type.
+   */
+  default Integer getZkNumJobsLimit() {
+    return CommonConstants.ControllerJob.DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK;
+  }
+
+  /**
+   * Checks if the job metadata entry can be safely deleted. Note that the job 
metadata entry will only be attempted
+   * to be deleted when the number of entries in the job metadata map exceeds 
the configured limit for the job type.
+   *
+   * @param jobMetadataEntry The job metadata entry to check - a pair of job 
ID and job metadata map
+   * @return true if the job metadata entry can be safely deleted, false 
otherwise
+   */
+  default boolean canDelete(Pair<String, Map<String, String>> 
jobMetadataEntry) {
+    return true;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 14a81ba0a1..611f301f67 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1307,6 +1307,8 @@ public class CommonConstants {
     public static final String SUBMISSION_TIME_MS = "submissionTimeMs";
     public static final String MESSAGE_COUNT = "messageCount";
 
+    public static final Integer DEFAULT_MAXIMUM_CONTROLLER_JOBS_IN_ZK = 100;
+
     /**
      * Segment reload job ZK props
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to