swaminathanmanish commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1878219843


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -190,19 +190,51 @@ protected List<SegmentConversionResult> 
convert(PinotTaskConfig pinotTaskConfig,
 
   /**
    * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime 
table.
-   * Checks that the version of the ZNode matches with the version cached 
earlier. If yes, proceeds to update
-   * watermark in the ZNode
-   * TODO: Making the minion task update the ZK metadata is an anti-pattern, 
however cannot see another way to do it
+   * Before uploading the segments, updates the metadata with the expected 
results
+   * of the successful execution of current subtask.
+   * The expected result updated in metadata is read by the next iteration of 
Task Generator.
    */
   @Override
-  public void postProcess(PinotTaskConfig pinotTaskConfig) {
-    Map<String, String> configs = pinotTaskConfig.getConfigs();
-    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
-    long waterMarkMs = 
Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
-    RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
-        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, 
waterMarkMs);
-    _minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, 
RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        _expectedVersion);
+  protected void preUploadSegments(SegmentUploadContext context)
+      throws Exception {
+    super.preUploadSegments(context);
+    String realtimeTableName = context.getTableNameWithType();
+    while (true) {
+      ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+          
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
+              RealtimeToOfflineSegmentsTask.TASK_TYPE);
+      int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+
+      RealtimeToOfflineSegmentsTaskMetadata 
realtimeToOfflineSegmentsTaskMetadata =
+          
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+
+      Map<String, List<String>> 
realtimeSegmentVsCorrespondingOfflineSegmentMap =
+          
realtimeToOfflineSegmentsTaskMetadata.getRealtimeSegmentVsCorrespondingOfflineSegmentMap();
+
+      List<String> segmentsFrom =
+          Arrays.stream(StringUtils.split(context.getInputSegmentNames(), 
MinionConstants.SEGMENT_NAME_SEPARATOR))
+              .map(String::trim).collect(Collectors.toList());
+
+      List<String> segmentsTo =
+          
context.getSegmentConversionResults().stream().map(SegmentConversionResult::getSegmentName)
+              .collect(Collectors.toList());
+
+      for (String segmentFrom : segmentsFrom) {
+        
Preconditions.checkState(!realtimeSegmentVsCorrespondingOfflineSegmentMap.containsKey(segmentFrom));
+        realtimeSegmentVsCorrespondingOfflineSegmentMap.put(segmentFrom, 
segmentsTo);

Review Comment:
   We dont need to store the entire list for every segment if thats what we are 
doing here? 
   As suggested we can have 2 separate lists (one for input and other for 
output). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to