klsince commented on code in PR #11740:
URL: https://github.com/apache/pinot/pull/11740#discussion_r1361254945


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2063,28 +2070,76 @@ public boolean addNewReloadAllSegmentsJob(String 
tableNameWithType, String jobId
         
ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.RELOAD_SEGMENT));
   }
 
+  /**
+   * Adds a new job metadata for controller job like table rebalance or reload 
into ZK
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobResourcePath zkPath to add the new job metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
   public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, String jobResourcePath) {
+    return addControllerJobToZK(jobId, jobMetadata, jobResourcePath, prev -> 
true);
+  }
+
+  /**
+   * Adds a new job metadata for controller job like table rebalance or reload 
into ZK
+   * @param jobId job's UUID
+   * @param jobMetadata the job metadata
+   * @param jobResourcePath zkPath to add the new job metadata
+   * @param prevJobMetadataChecker to check the previous job metadata before 
adding new one
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata, String jobResourcePath,
+      Predicate<Map<String, String>> prevJobMetadataChecker) {
     
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
 != null,
         "Submission Time in JobMetadata record not set. Cannot expire these 
records");
     Stat stat = new Stat();
-    ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
-    if (tableJobsZnRecord != null) {
-      Map<String, Map<String, String>> tasks = 
tableJobsZnRecord.getMapFields();
-      tasks.put(jobId, jobMetadata);
-      if (tasks.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
-        tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
+    ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    if (jobsZnRecord != null) {
+      Map<String, Map<String, String>> jobMetadataMap = 
jobsZnRecord.getMapFields();
+      Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+      if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+        return false;
+      }
+      jobMetadataMap.put(jobId, jobMetadata);
+      if (jobMetadataMap.size() > 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+        jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) -> 
Long.compare(
                 
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
                 
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
             .collect(Collectors.toList()).subList(0, 
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
             .stream().collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
       }
-      tableJobsZnRecord.setMapFields(tasks);
-      return _propertyStore.set(jobResourcePath, tableJobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+      jobsZnRecord.setMapFields(jobMetadataMap);
+      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
     } else {
-      tableJobsZnRecord = new ZNRecord(jobResourcePath);
-      tableJobsZnRecord.setMapField(jobId, jobMetadata);
-      return _propertyStore.set(jobResourcePath, tableJobsZnRecord, 
AccessOption.PERSISTENT);
+      jobsZnRecord = new ZNRecord(jobResourcePath);
+      jobsZnRecord.setMapField(jobId, jobMetadata);
+      return _propertyStore.set(jobResourcePath, jobsZnRecord, 
AccessOption.PERSISTENT);
+    }
+  }
+
+  /**
+   * Update existing job metadata belong to the table
+   * @param tableNameWithType whose job metadata to be updated
+   * @param jobResourcePath zkPath to look for the job metadata for the table
+   * @param updater to modify the job metadata in place
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean updateAllJobsForTable(String tableNameWithType, String 
jobResourcePath,

Review Comment:
   sgtm
   
   > making the updater on the whole jobMetadataMap which is more general 
   
   I didn't get this one. the updater is to handle the whole `jobMetadataMap` 
currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to