jtao15 commented on code in PR #15177:
URL: https://github.com/apache/pinot/pull/15177#discussion_r1980338451


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +565,46 @@ static List<SegmentZKMetadata> 
filterSegmentsBasedOnStatus(TableType tableType,
       //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
       //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
       //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      //
+      // Based on the following considerations:

Review Comment:
   (nit) let's remove/update the comments on L549-L551.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +565,46 @@ static List<SegmentZKMetadata> 
filterSegmentsBasedOnStatus(TableType tableType,
       //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
       //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
       //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      //
+      // Based on the following considerations:
+      // 1. The BufferTime configuration will do the work of NOT merging the 
most recent segments, it will cover most
+      //    of the cases
+      // 2. If one wants to merge the most recent segments, and hence changes 
the BufferTime to 0, we need to make sure
+      //    we do NOT merge the consuming segments
+      // 3. There is a corner case of PauseConsumption, the function will seal 
the consuming segments and NOT create
+      //    a new one, nor upload the offset metadata.
+      // We decide to ONLY filter out the consuming segments and most recent 
completed segments for each partition.
+      Map<Integer, SegmentZKMetadata> latestCompletedSegmentInEachPartition = 
new HashMap<>();
+      List<SegmentZKMetadata> filteredSegments = new ArrayList<>();
       for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        if (!segmentZKMetadata.getStatus().isCompleted()
-            && segmentZKMetadata.getTotalDocs() > 0
-            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
-          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
+        if (segmentZKMetadata.getStatus().isCompleted()) {
+          // completed segments
+          String[] segmentIdComponents = 
segmentZKMetadata.getSegmentName().split("__");

Review Comment:
   Better to leverage LLCSegmentName.isLLCSegment() and getPartitionGroupId().



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +565,46 @@ static List<SegmentZKMetadata> 
filterSegmentsBasedOnStatus(TableType tableType,
       //    if new records are consumed later, the MergeRollupTask may have 
already moved watermarks forward, and may
       //    not be able to merge those lately-created segments -- we assume 
that users will have a way to backfill those
       //    records correctly.
-      long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+      //
+      // Based on the following considerations:
+      // 1. The BufferTime configuration will do the work of NOT merging the 
most recent segments, it will cover most
+      //    of the cases
+      // 2. If one wants to merge the most recent segments, and hence changes 
the BufferTime to 0, we need to make sure
+      //    we do NOT merge the consuming segments
+      // 3. There is a corner case of PauseConsumption, the function will seal 
the consuming segments and NOT create
+      //    a new one, nor upload the offset metadata.
+      // We decide to ONLY filter out the consuming segments and most recent 
completed segments for each partition.
+      Map<Integer, SegmentZKMetadata> latestCompletedSegmentInEachPartition = 
new HashMap<>();
+      List<SegmentZKMetadata> filteredSegments = new ArrayList<>();
       for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        if (!segmentZKMetadata.getStatus().isCompleted()
-            && segmentZKMetadata.getTotalDocs() > 0
-            && segmentZKMetadata.getStartTimeMs() < 
earliestStartTimeMsOfInProgressSegments) {
-          earliestStartTimeMsOfInProgressSegments = 
segmentZKMetadata.getStartTimeMs();
+        if (segmentZKMetadata.getStatus().isCompleted()) {
+          // completed segments
+          String[] segmentIdComponents = 
segmentZKMetadata.getSegmentName().split("__");
+          if (segmentIdComponents.length > 1) {
+            // realtime segments
+            int partitionId = Integer.parseInt(segmentIdComponents[1]);
+            if 
(!latestCompletedSegmentInEachPartition.containsKey(partitionId)) {
+              // latest
+              latestCompletedSegmentInEachPartition.put(partitionId, 
segmentZKMetadata);
+            } else {
+              long currentOffset = 
Long.parseLong(segmentZKMetadata.getEndOffset());
+              long maxOffset = 
Long.parseLong(latestCompletedSegmentInEachPartition.get(partitionId).getEndOffset());
+              if (currentOffset > maxOffset) {
+                // latest
+                
filteredSegments.add(latestCompletedSegmentInEachPartition.get(partitionId));
+                latestCompletedSegmentInEachPartition.put(partitionId, 
segmentZKMetadata);
+              } else {
+                // not latest
+                filteredSegments.add(segmentZKMetadata);
+              }
+            }
+          } else {
+            // not-realtime segments

Review Comment:
   (nit) In this case, they are still realtime segments, but either uploaded or 
merged/rolluped. Maybe update the comment to make it more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to