Harnoor7 commented on code in PR #14578: URL: https://github.com/apache/pinot/pull/14578#discussion_r1875921199
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -190,19 +187,45 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, /** * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. - * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update - * watermark in the ZNode + * Update the number of subtasks pending atomically. If number of subtasks left are zero, proceeds to update + * watermark in the ZNode. * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it */ @Override public void postProcess(PinotTaskConfig pinotTaskConfig) { Map<String, String> configs = pinotTaskConfig.getConfigs(); String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY); - long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); - RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata = - new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs); - _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE, - _expectedVersion); + + while (true) { + ZNRecord realtimeToOfflineSegmentsTaskZNRecord = + _minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName, + RealtimeToOfflineSegmentsTask.TASK_TYPE); + int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion(); + + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord); + + int numSubtasksLeft = realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() - 1; + Preconditions.checkState(numSubtasksLeft >= 0, + "Num of minion subtasks pending for table: %s should be greater than equal to zero.", + realtimeTableName); + realtimeToOfflineSegmentsTaskMetadata.setNumSubtasksPending(numSubtasksLeft); + + try { + if (numSubtasksLeft == 0) { Review Comment: current approach (Before this PR) is also not good, we might have to maintain a attribute in segment metadata marking that segment has been moved to offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org