Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r910907030


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java:
##########
@@ -27,6 +27,15 @@
  */
 public abstract class SegmentDataManager {
   private int _referenceCount = 1;
+  private long _segmentLoadTimeInMillisEpoch = System.currentTimeMillis();

Review Comment:
   (minor) `_loadTimeMs` for simplicity, same for the getter and setter



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1977,6 +1979,80 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String 
tableNameWithType, String taskId) {
+    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(controllerJobResourcePath, 
AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = 
_propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType) {
+    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
+      return tableJobsRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE,
 tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public void addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE,
 tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private void addReloadJobToZK(String tableNameWithType, String taskId, 
Map<String, String> taskMetadata) {
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    ZNRecord tableJobsZnRecord;
+
+    if (_propertyStore.exists(jobResourcePath, AccessOption.PERSISTENT)) {

Review Comment:
   No need to check if the record exists. We can directly `get()` and check if 
the return is `null`. It can also prevent race condition.
   When the record exist, we should track the record version (passing a `Stat` 
into the `get()`), and check the expected version when setting the record to 
prevent race condition



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? 
TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = 
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, 
forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload 
messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for 
table {}, segment {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse("Sent " + msgInfo + " reload messages");

Review Comment:
   ```suggestion
         return new SuccessResponse("Sent " + msgInfo.getLeft() + " reload 
messages");
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? 
TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = 
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, 
forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload 
messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for 
table {}, segment {}",

Review Comment:
   Suggest reflecting this back to the user, instead of just logging an error 
and return success. User won't be able to read the reload status



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to