This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 704f73d4f9 update ControllerJobType from enum to string (#12518) 704f73d4f9 is described below commit 704f73d4f91e369a8a3fb2081ee5a3ba50a0daa5 Author: Haitao Zhang <hai...@startree.ai> AuthorDate: Wed Feb 28 23:16:43 2024 -0800 update ControllerJobType from enum to string (#12518) --- .../pinot/common/metadata/ZKMetadataProvider.java | 5 ++--- .../metadata/controllerjob/ControllerJobType.java | 15 +++++++++++++-- .../api/resources/PinotTableRestletResource.java | 15 +++++---------- .../helix/core/PinotHelixResourceManager.java | 22 +++++++++++----------- .../rebalance/ZkBasedTableRebalanceObserver.java | 2 +- .../tenant/ZkBasedTenantRebalanceObserver.java | 2 +- 6 files changed, 33 insertions(+), 28 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index eba5806365..d69d386a26 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -32,7 +32,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.assignment.InstancePartitions; -import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; @@ -174,8 +173,8 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName); } - public static String constructPropertyStorePathForControllerJob(ControllerJobType jobType) { - return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType.name()); + public static String constructPropertyStorePathForControllerJob(String jobType) { + return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, jobType); } public static String constructPropertyStorePathForResourceConfig(String resourceName) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java index 025d7b6c39..e1a8efb8af 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java @@ -18,6 +18,17 @@ */ package org.apache.pinot.common.metadata.controllerjob; -public enum ControllerJobType { - RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE +import com.google.common.collect.ImmutableSet; +import java.util.Set; + + +public class ControllerJobType { + private ControllerJobType() { + } + public static final String RELOAD_SEGMENT = "RELOAD_SEGMENT"; + public static final String FORCE_COMMIT = "FORCE_COMMIT"; + public static final String TABLE_REBALANCE = "TABLE_REBALANCE"; + public static final String TENANT_REBALANCE = "TENANT_REBALANCE"; + public static final Set<String> + VALID_CONTROLLER_JOB_TYPE = ImmutableSet.of(RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TENANT_REBALANCE); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 23a405514b..8fa1d3b1a9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -1082,20 +1082,15 @@ public class PinotTableRestletResource { List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); - Set<ControllerJobType> validJobTypes = - java.util.Arrays.stream(ControllerJobType.values()).collect(Collectors.toSet()); - Set<ControllerJobType> jobTypesToFilter = null; + Set<String> jobTypesToFilter = null; if (StringUtils.isNotEmpty(jobTypesString)) { - try { - jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))).stream() - .map(type -> ControllerJobType.valueOf(type)).collect(Collectors.toSet()); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Valid Types are: " + validJobTypes); - } + jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))) + .stream().collect(Collectors.toSet()); } Map<String, Map<String, String>> result = new HashMap<>(); for (String tableNameWithType : tableNamesWithType) { - result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null ? validJobTypes : jobTypesToFilter, + result.putAll(_pinotHelixResourceManager.getAllJobs(jobTypesToFilter == null + ? ControllerJobType.VALID_CONTROLLER_JOB_TYPE : jobTypesToFilter, jobMetadata -> jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE) .equals(tableNameWithType))); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 22ba2de5a7..5e19ea9cf6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2036,7 +2036,7 @@ public class PinotHelixResourceManager { * @return Map representing the job's ZK properties */ @Nullable - public Map<String, String> getControllerJobZKMetadata(String jobId, ControllerJobType jobType) { + public Map<String, String> getControllerJobZKMetadata(String jobId, String jobType) { String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT); return jobsZnRecord != null ? jobsZnRecord.getMapFields().get(jobId) : null; @@ -2046,10 +2046,10 @@ public class PinotHelixResourceManager { * Returns a Map of jobId to job's ZK metadata that passes the checker, like for specific tables. * @return A Map of jobId to job properties */ - public Map<String, Map<String, String>> getAllJobs(Set<ControllerJobType> jobTypes, + public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes, Predicate<Map<String, String>> jobMetadataChecker) { Map<String, Map<String, String>> controllerJobs = new HashMap<>(); - for (ControllerJobType jobType : jobTypes) { + for (String jobType : jobTypes) { String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null, AccessOption.PERSISTENT); if (jobsZnRecord == null) { @@ -2059,8 +2059,8 @@ public class PinotHelixResourceManager { for (Map.Entry<String, Map<String, String>> jobMetadataEntry : jobMetadataMap.entrySet()) { String jobId = jobMetadataEntry.getKey(); Map<String, String> jobMetadata = jobMetadataEntry.getValue(); - Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType.name()), - "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType.name(), jobResourcePath, jobId); + Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType), + "Got unexpected jobType: %s at jobResourcePath: %s with jobId: %s", jobType, jobResourcePath, jobId); if (jobMetadataChecker.test(jobMetadata)) { controllerJobs.put(jobId, jobMetadata); } @@ -2083,7 +2083,7 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); @@ -2096,7 +2096,7 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, JsonUtils.objectToString(consumingSegmentsCommitted)); @@ -2116,7 +2116,7 @@ public class PinotHelixResourceManager { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT); @@ -2129,7 +2129,7 @@ public class PinotHelixResourceManager { * @param jobType the type of the job to figure out where job metadata is kept in ZK * @return boolean representing success / failure of the ZK write step */ - public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType) { + public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType) { return addControllerJobToZK(jobId, jobMetadata, jobType, prev -> true); } @@ -2141,7 +2141,7 @@ public class PinotHelixResourceManager { * @param prevJobMetadataChecker to check the previous job metadata before adding new one * @return boolean representing success / failure of the ZK write step */ - public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, ControllerJobType jobType, + public boolean addControllerJobToZK(String jobId, Map<String, String> jobMetadata, String jobType, Predicate<Map<String, String>> prevJobMetadataChecker) { Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS) != null, "Submission Time in JobMetadata record not set. Cannot expire these records"); @@ -2178,7 +2178,7 @@ public class PinotHelixResourceManager { * @param updater to modify the job metadata in place * @return boolean representing success / failure of the ZK write step */ - public boolean updateJobsForTable(String tableNameWithType, ControllerJobType jobType, + public boolean updateJobsForTable(String tableNameWithType, String jobType, Consumer<Map<String, String>> updater) { String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType); Stat stat = new Stat(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index f02a62cdee..8386544f3c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -206,7 +206,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE); try { jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(tableRebalanceProgressStats)); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java index 9fabc64d9d..81962f90f2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java @@ -100,7 +100,7 @@ public class ZkBasedTenantRebalanceObserver implements TenantRebalanceObserver { jobMetadata.put(CommonConstants.ControllerJob.TENANT_NAME, _tenantName); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, _jobId); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE.name()); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TENANT_REBALANCE); try { jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(_progressStats)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org