swaminathanmanish commented on code in PR #14578: URL: https://github.com/apache/pinot/pull/14578#discussion_r1875843168
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -190,19 +187,45 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, /** * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. - * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update - * watermark in the ZNode + * Update the number of subtasks pending atomically. If number of subtasks left are zero, proceeds to update + * watermark in the ZNode. * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it */ @Override public void postProcess(PinotTaskConfig pinotTaskConfig) { Map<String, String> configs = pinotTaskConfig.getConfigs(); String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY); - long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); - RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata = - new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs); - _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE, - _expectedVersion); + + while (true) { + ZNRecord realtimeToOfflineSegmentsTaskZNRecord = + _minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName, + RealtimeToOfflineSegmentsTask.TASK_TYPE); + int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion(); + + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord); + + int numSubtasksLeft = realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() - 1; + Preconditions.checkState(numSubtasksLeft >= 0, + "Num of minion subtasks pending for table: %s should be greater than equal to zero.", + realtimeTableName); + realtimeToOfflineSegmentsTaskMetadata.setNumSubtasksPending(numSubtasksLeft); + + try { + if (numSubtasksLeft == 0) { Review Comment: What happens when a minion task fails before it decrementing this counter ? This state would never go down to zero right? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java: ########## @@ -147,9 +149,20 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod); long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod); + ZNRecord realtimeToOfflineZNRecord = + _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, + realtimeTableName); + int expectedVersion = realtimeToOfflineZNRecord != null ? realtimeToOfflineZNRecord.getVersion() : -1; + RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = + getRTOTaskMetadata(realtimeTableName, completedSegmentsZKMetadata, bucketMs, realtimeToOfflineZNRecord); + + Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() == 0, Review Comment: Minions can fail and trip this invariant ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java: ########## @@ -190,19 +187,45 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, /** * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. - * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update - * watermark in the ZNode + * Update the number of subtasks pending atomically. If number of subtasks left are zero, proceeds to update + * watermark in the ZNode. * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it */ @Override public void postProcess(PinotTaskConfig pinotTaskConfig) { Map<String, String> configs = pinotTaskConfig.getConfigs(); String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY); - long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); - RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata = - new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs); - _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE, - _expectedVersion); + + while (true) { Review Comment: Since realtimeToOffline is a period task to move segments from realtime to offline table. Its on a schedule (it should not be allowed to run adhoc). Why don't we have the task generator advance the watermark instead of doing it from the minion. The task generator anyway needs to handle the minion execution failure scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org