Jackie-Jiang commented on a change in pull request #5828:
URL: https://github.com/apache/incubator-pinot/pull/5828#discussion_r467174357



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely 
removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) 
{
+            // Zombie lineage entry is safe to remove. This will allow the 
task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), 
segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. 
(tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed 
for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment 
lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are 
successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be 
a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie 
segments.
+   */
+  private List<String> 
computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {

Review comment:
       Remove the null check which is redundant

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);

Review comment:
       Don't log this as this can flood the log for large cluster

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       This won't give you the up-to-date segment list because we delay the 
deletion of segment ZK metadata

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely 
removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) 
{
+            // Zombie lineage entry is safe to remove. This will allow the 
task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);

Review comment:
       This part is wrong. This will remove the real in-progress lineage

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());

Review comment:
       Don't use `Assert`, use `Preconditions` instead

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));
+
+        // Clean up the segment lineage
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+          if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+            // The lineage entry for 'COMPLETED' state can only be safely 
removed when both segmentFrom & segmentTo
+            // are all removed from the table.
+            if (Collections.disjoint(segmentsForTable, 
lineageEntry.getSegmentsFrom()) && Collections
+                .disjoint(segmentsForTable, lineageEntry.getSegmentsTo())) {
+              segmentLineage.deleteLineageEntry(lineageEntryId);
+            }
+          } else if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS) 
{
+            // Zombie lineage entry is safe to remove. This will allow the 
task scheduler to re-schedule the
+            // source segments to be merged again.
+            segmentLineage.deleteLineageEntry(lineageEntryId);
+          }
+        }
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper
+            
.writeSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), 
segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String.format("Failed to clean up the segment lineage. 
(tableName = %s)", tableNameWithType);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+    LOGGER.info("Segment lineage metadata clean-up is successfully processed 
for table: {}", tableNameWithType);
+  }
+
+  /**
+   * Compute the segments that can be safely deleted based on the segment 
lineage.
+   *
+   * 1. The original segments can be deleted once the merged segments are 
successfully uploaded.
+   * 2. If the segmentReplacement operation fails in the middle, there can be 
a case where partial segments are
+   *    uploaded to the table. We should periodically clean up those zombie 
segments.
+   */
+  private List<String> 
computeSegmentsToDeleteFromSegmentLineage(SegmentLineage segmentLineage) {
+    if (segmentLineage != null) {
+      List<String> segmentsToDelete = new ArrayList<>();
+      for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+        LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(lineageEntryId);
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          // If the lineage state is 'COMPLETED', it is safe to delete all 
segments from 'segmentsFrom'
+          segmentsToDelete.addAll(lineageEntry.getSegmentsFrom());

Review comment:
       Can we directly remove the lineage entry here? Why do we need to wait 
for all merged segments also being removed?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
##########
@@ -177,4 +193,87 @@ private boolean shouldDeleteInProgressLLCSegment(String 
segmentName, IdealState
       return states.size() == 1 && 
states.contains(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE);
     }
   }
+
+  private void manageSegmentLineageCleanupForTable(String tableNameWithType) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment lineage
+        ZNRecord segmentLineageZNRecord = SegmentLineageAccessHelper
+            
.getSegmentLineageZNRecord(_pinotHelixResourceManager.getPropertyStore(), 
tableNameWithType);
+        if (segmentLineageZNRecord == null) {
+          LOGGER.info("Segment lineage does not exist for table: {}", 
tableNameWithType);
+          return true;
+        }
+        SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        int expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Delete segments based on the segment lineage
+        PinotResourceManagerResponse response = _pinotHelixResourceManager
+            .deleteSegments(tableNameWithType, 
computeSegmentsToDeleteFromSegmentLineage(segmentLineage));
+
+        Assert.assertTrue(response.isSuccessful());
+
+        // Fetch available segments for the table
+        Set<String> segmentsForTable = new 
HashSet<>(_pinotHelixResourceManager.getSegmentsFor(tableNameWithType));

Review comment:
       The lineage deletion can be handled along with the segment deletion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to