saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931815219


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = 
_propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(jobId);
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find job metadata for id: {}", jobId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns a Map of jobId to job's ZK metadata for the given table
+   * @param tableNameWithType the table for which jobs are to be fetched
+   * @return A Map of jobId to job properties
+   */
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType) {
+    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
+      Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> 
job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              
.equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find controller job node for table : {}", 
tableNameWithType, e);
+    }
+
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param segmentName Name of the segment being reloaded
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. 
Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE,
 tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. 
Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE,
 tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, 
Map<String, String> jobMetadata) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to