Harnoor7 commented on code in PR #14578:
URL: https://github.com/apache/pinot/pull/14578#discussion_r1875889632


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -190,19 +187,45 @@ protected List<SegmentConversionResult> 
convert(PinotTaskConfig pinotTaskConfig,
 
   /**
    * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
-   * Checks that the version of the ZNode matches with the version cached 
earlier. If yes, proceeds to update
-   * watermark in the ZNode
+   * Update the number of subtasks pending atomically. If number of subtasks 
left are zero, proceeds to update
+   * watermark in the ZNode.
    * TODO: Making the minion task update the ZK metadata is an anti-pattern, 
however cannot see another way to do it
    */
   @Override
   public void postProcess(PinotTaskConfig pinotTaskConfig) {
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
-    long waterMarkMs = 
Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
-    RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
-        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
waterMarkMs);
-    _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, 
RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        _expectedVersion);
+
+    while (true) {
+      ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+          
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
+              RealtimeToOfflineSegmentsTask.TASK_TYPE);
+      int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+
+      int numSubtasksLeft = 
realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() - 1;
+      Preconditions.checkState(numSubtasksLeft >= 0,
+          "Num of minion subtasks pending for table: %s should be greater than 
equal to zero.",
+          realtimeTableName);
+      
realtimeToOfflineSegmentsTaskMetadata.setNumSubtasksPending(numSubtasksLeft);
+
+      try {
+        if (numSubtasksLeft == 0) {

Review Comment:
   In that case, minion job will keep on retrying and will fail. Same subtasks 
will be picked in next iteration then. 
   This is the case in current scenario as well, If minion fails after segment 
is moved to offline, same segment gets picked again next time. But since 
segment name will be the same, already existing offline segments gets 
overwritten.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to