Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r468077683



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments 
are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if 
the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all 
segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely 
removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) 
{
+            // If the lineage state is 'IN_PROGRESS', we need to clean up the 
zombie lineage entry and its segments
+            if (lineageEntry.getTimestamp() < System.currentTimeMillis() - 
LINEAGE_ENTRY_CLEANUP_RETENTION_IN_MILLIS) {
+              segmentsToDelete.addAll(lineageEntry.getSegmentsTo());
+
+              // The lineage entry with 'IN_PROGRESS' state can only be safely 
removed when segmentTo are all removed
+              // from the table. Deleting lineage will allow the task 
scheduler to re-schedule the source segments to
+              // be merged again.
+              if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsTo())) {
+                segmentLineage.deleteLineageEntry(lineageEntryId);
+              }
+            }
+          }
+        }
+
+        // Delete segments based on the segment lineage
+        _pinotHelixResourceManager.deleteSegments(tableNameWithType, 
segmentsToDelete);

Review comment:
       We might want to delete segments after successfully writing back the 
lineage, or the segment might be deleted without updating the lineage

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,63 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // 1. The original segments can be deleted once the merged segments 
are successfully uploaded
+        // 2. The zombie lineage entry & merged segments should be deleted if 
the segment replacement failed in
+        //    the middle
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+        List<String> segmentsToDelete = new ArrayList<>();
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // If the lineage state is 'COMPLETED', it is safe to delete all 
segments from 'segmentsFrom'
+            segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());
+
+            // The lineage entry with 'COMPLETED' state can only be safely 
removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }

Review comment:
       We can simplify the logic as following, same for `IN_PROGRESS`
   ```suggestion
               Set<String> sourceSegments = new 
HashSet<>(lineageEntry.getSegmentsFrom());
               sourceSegments.retainAll(segmentsForTable);
               if (sourceSegments.isEmpty()) {
                 segmentLineage.deleteLineageEntry(lineageEntryId);
               } else {
                 segmentsToDelete.addAll(sourceSegments);
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to