This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 90fc9285b3 Ensure SegmentDeletionManager deletes deepstore files 
created by BaseMultipleSegmentsConversionExecutor (#15048)
90fc9285b3 is described below

commit 90fc9285b3e34e231c6ba3574a4b414cf7596ac4
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Fri Feb 14 11:01:14 2025 +0530

    Ensure SegmentDeletionManager deletes deepstore files created by 
BaseMultipleSegmentsConversionExecutor (#15048)
    
    * Rely on Segment ZK download url for deleting segment in the deletion 
manager and use the existing naming scheme only as a backup
    
    * Fixing integration tests
    
    * Changing the approach to using fixed extension instead of relying on 
reading segment ZK metadata
    
    * Minor improvements in logging
    
    * add logs
    
    * Improving java docs
    
    * Fix java docs and minor logs
---
 .../helix/core/SegmentDeletionManager.java         | 44 +++++++++-
 .../pinot/controller/api/TableViewsTest.java       |  9 +-
 .../PinotHelixResourceManagerStatelessTest.java    | 98 +++++++++++++++-------
 .../core/util/SegmentDeletionManagerTest.java      | 87 +++++++++++++++++++
 4 files changed, 205 insertions(+), 33 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 5a7012657e..d1d5544be7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.LeadControllerManager;
 import 
org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager;
@@ -235,7 +236,11 @@ public class SegmentDeletionManager {
       long retentionMs = deletedSegmentsRetentionMs == null
           ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
       String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, 
URIUtils.encode(segmentId));
+      URI fileToDeleteURI = getFileToDeleteURI(rawTableName, segmentId);
+      if (fileToDeleteURI == null) {
+        LOGGER.warn("No segment file found for segment: {} in deep store, 
skipping deletion", segmentId);
+        return;
+      }
       PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
       // Segment metadata in remote store is an optimization, to avoid 
downloading segment to parse metadata.
       // This is catch all clean up to ensure that metadata is removed from 
deep store.
@@ -282,6 +287,43 @@ public class SegmentDeletionManager {
     }
   }
 
+  /**
+   * Retrieves the URI for segment deletion by checking two possible segment 
file variants in deep store.
+   * Looks for the segment file in two formats:
+   * - Without extension (conventional naming)
+   * - With .tar.gz extension (used by minions in 
BaseMultipleSegmentsConversionExecutor)
+   *
+   * @param rawTableName name of the table containing the segment
+   * @param segmentId name of the segment
+   * @return URI of the existing segment file if found in either format, null 
if segment doesn't exist in either format
+   *         or if there are filesystem access errors
+   */
+  @Nullable
+  private URI getFileToDeleteURI(String rawTableName, String segmentId) {
+    try {
+      URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName, 
URIUtils.encode(segmentId));
+      PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme());
+
+      // Check for plain segment file first
+      if (pinotFS.exists(plainFileUri)) {
+        return plainFileUri;
+      }
+
+      URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName,
+          URIUtils.encode(segmentId + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
+
+      // Check for .tar.gz segment file
+      if (pinotFS.exists(tarGzFileUri)) {
+        return tarGzFileUri;
+      }
+      LOGGER.error("No file found for segment: {} in deep store", segmentId);
+      return null;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while trying to find file for segment: {} 
in deep store", segmentId);
+      return null;
+    }
+  }
+
   /**
    * Removes aged deleted segments from the deleted directory
    */
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 639542eab1..7df080df26 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -22,6 +22,7 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
 import org.apache.helix.InstanceType;
+import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.api.resources.TableViews;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
@@ -63,7 +64,9 @@ public class TableViewsTest extends ControllerTest {
     DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig);
     DEFAULT_INSTANCE.getHelixResourceManager()
         
.addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME),
-            SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
OFFLINE_SEGMENT_NAME), "downloadUrl");
+            SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
OFFLINE_SEGMENT_NAME),
+            
getDownloadURL(DEFAULT_INSTANCE.getHelixResourceManager().getDataDir(), 
OFFLINE_TABLE_NAME,
+                OFFLINE_SEGMENT_NAME));
 
     // Create the hybrid table
     DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME);
@@ -168,6 +171,10 @@ public class TableViewsTest extends ControllerTest {
         TableViews.TableView.class);
   }
 
+  private String getDownloadURL(String controllerDataDir, String rawTableName, 
String segmentId) {
+    return URIUtils.getUri(controllerDataDir, rawTableName, 
URIUtils.encode(segmentId)).toString();
+  }
+
   @AfterClass
   public void tearDown() {
     DEFAULT_INSTANCE.cleanup();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 095865d923..dda542d3d2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -983,9 +984,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
             new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"), null)));
     // Try after new segments added to the table
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s20"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s20"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s20"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s21"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s21"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId,
         new EndReplaceSegmentsRequest(Arrays.asList("s21"), null));
     SegmentLineage segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
@@ -1012,7 +1015,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Add 5 segments
     for (int i = 0; i < 5; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
     assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false).size(), 5);
 
@@ -1057,9 +1061,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Try after new segments added to the table
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s5"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s5"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s5"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s6"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId1, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds(), 
Collections.singleton(lineageEntryId1));
@@ -1080,7 +1086,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t1_0"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t1_0"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t1_0"));
     IdealState idealState = 
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
     assertNotNull(idealState);
     assertTrue(idealState.getPartitionSet().contains("merged_t1_0"));
@@ -1112,7 +1119,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t2_0"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t2_0"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t2_0"));
 
     // Without force cleanup, 'startReplaceSegments' again should fail because 
of duplicate segments on 'segmentFrom'
     List<String> segmentsFrom4 = Arrays.asList("s1", "s2");
@@ -1140,9 +1148,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload segments again
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t3_0"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t3_0"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_0"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t3_1"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_t3_1"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_1"));
 
     // Finish the replacement
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId4, null);
@@ -1181,7 +1191,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s7"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s7"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s7"));
 
     // Start another new segment replacement with empty segmentsFrom, and 
check that previous lineages with empty
     // segmentsFrom are not reverted
@@ -1196,9 +1207,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Finish the replacement
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s9"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s9"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s9"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s10"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s10"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s10"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId7, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
@@ -1221,7 +1234,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s11"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s11"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s11"));
 
     // Start another new segment replacement with segmentsFrom overlapping 
with previous lineage, and check that
     // previous lineages with overlapped segmentsFrom are reverted
@@ -1236,9 +1250,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Finish the replacement
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s13"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s13"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s13"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s14"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s14"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s14"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId9, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
     assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
@@ -1286,7 +1302,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Add 3 segments
     for (int i = 0; i < 3; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
     List<String> segmentsForTable = 
_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false);
     assertEquals(segmentsForTable.size(), 3);
@@ -1306,7 +1323,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Add new segments
     for (int i = 3; i < 6; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false), "s0", "s1", "s2", "s3", "s4",
         "s5");
@@ -1342,7 +1360,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Add partial segments to indicate incomplete protocol
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s6"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6"));
     assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false).size(), 7);
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
true), "s3", "s4", "s5");
 
@@ -1375,7 +1394,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Add new segments
     for (int i = 9; i < 12; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
 
     // Call end segment replacements
@@ -1396,7 +1416,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Re-upload (s9, s10, s11) to test the segment clean up from 
startReplaceSegments
     for (int i = 9; i < 12; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
     assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, 
false), "s3", "s4", "s5", "s9", "s10",
         "s11");
@@ -1411,7 +1432,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     // Upload the new segments (s12, s13, s14)
     for (int i = 12; i < 15; i++) {
       _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i), "downloadUrl");
+          SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" 
+ i),
+          getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i));
     }
 
     // Call endReplaceSegments to start to use (s12, s13, s14)
@@ -1433,7 +1455,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s15"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s15"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s15"));
 
     // Start another new segment replacement with empty segmentsFrom, and 
check that previous lineages with empty
     // segmentsFrom are not reverted
@@ -1447,9 +1470,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Finish the replacement
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s17"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s17"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s17"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s18"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s18"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s18"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId6, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
     
assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(), 
segmentsFrom6);
@@ -1469,7 +1494,8 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload partial data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s19"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s19"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s19"));
 
     // Start another new segment replacement with segmentsFrom overlapping 
with previous lineage, and check that
     // previous lineages with overlapped segmentsFrom are reverted
@@ -1483,9 +1509,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Finish the replacement
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s21"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s21"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s22"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s22"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s22"));
 
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId8, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
@@ -1506,9 +1534,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Upload data
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s23"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s23"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s23"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s24"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s24"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24"));
 
     // Start another new segment replacement with segmentsTo overlapping with 
previous lineage, and check that previous
     // lineages with overlapped segmentsTo are reverted
@@ -1526,9 +1556,11 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
 
     // Finish the replacement
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s24"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s24"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24"));
     _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s25"), "downloadUrl");
+        SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"s25"),
+        getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s25"));
     _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId10, null);
     segmentLineage = 
SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_TABLE_NAME);
     
assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsFrom(),
 segmentsFrom10);
@@ -1541,6 +1573,10 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     assertNull(segmentLineage);
   }
 
+  private String getDownloadURL(String controllerDataDir, String rawTableName, 
String segmentId) {
+    return URIUtils.getUri(controllerDataDir, rawTableName, 
URIUtils.encode(segmentId)).toString();
+  }
+
   private static void assertSetEquals(Collection<String> actual, String... 
expected) {
     Set<String> actualSet = actual instanceof Set ? (Set<String>) actual : new 
HashSet<>(actual);
     assertEquals(actualSet, new HashSet<>(Arrays.asList(expected)));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index e600643934..01cfa1ae76 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -33,11 +33,13 @@ import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
@@ -54,6 +56,7 @@ import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.common.metadata.ZKMetadataProvider.constructPropertyStorePathForSegment;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -352,6 +355,76 @@ public class SegmentDeletionManagerTest {
     }, 2000L, 10_000L, "Unable to verify table deletion with retention");
   }
 
+
+  @Test
+  public void testSegmentDeletionLogicWithFileWithGZExtension()
+      throws Exception {
+    Map<String, Object> properties = new HashMap<>();
+    
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY 
+ ".class",
+        LocalPinotFS.class.getName());
+    PinotFSFactory.init(new PinotConfiguration(properties));
+
+    HelixAdmin helixAdmin = makeHelixAdmin();
+    ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore();
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+    SegmentDeletionManager deletionManager = new SegmentDeletionManager(
+        tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7);
+
+    // create table segment files.
+    Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
+    createTableAndSegmentFilesWithGZExtension(tempDir, 
segmentsThatShouldBeDeleted());
+    final File tableDir = new File(tempDir.getAbsolutePath() + File.separator 
+ TABLE_NAME);
+    final File deletedTableDir = new File(tempDir.getAbsolutePath() + 
File.separator + "Deleted_Segments"
+        + File.separator + TABLE_NAME);
+
+    // mock returning ZK Metadata for segment url
+    ZNRecord znRecord1 = mock(org.apache.helix.ZNRecord.class);
+    ZNRecord znRecord2 = mock(org.apache.helix.ZNRecord.class);
+    ZNRecord znRecord3 = mock(org.apache.helix.ZNRecord.class);
+    List<ZNRecord> znRecordList = List.of(znRecord1, znRecord2, znRecord3);
+    for (int i = 0; i < 3; i++) {
+      
when(znRecordList.get(i).getSimpleFields()).thenReturn(Map.of(CommonConstants.Segment.DOWNLOAD_URL,
+          tableDir.getAbsolutePath() + File.separator + 
segmentsThatShouldBeDeleted().get(i)
+              + TarCompressionUtils.TAR_GZ_FILE_EXTENSION));
+      when(propertyStore.get(constructPropertyStorePathForSegment(TABLE_NAME, 
segmentsThatShouldBeDeleted().get(i)),
+          null, AccessOption.PERSISTENT)).thenReturn(znRecordList.get(i));
+    }
+
+    // delete the segments instantly.
+    SegmentsValidationAndRetentionConfig mockValidationConfig = 
mock(SegmentsValidationAndRetentionConfig.class);
+    
when(mockValidationConfig.getDeletedSegmentsRetentionPeriod()).thenReturn("0d");
+    TableConfig mockTableConfig = mock(TableConfig.class);
+    
when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
+    deletionManager.deleteSegments(TABLE_NAME, segments, mockTableConfig);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        Assert.assertEquals(tableDir.listFiles().length, 0);
+        Assert.assertTrue(!deletedTableDir.exists() || 
deletedTableDir.listFiles().length == 0);
+        return true;
+      } catch (Throwable t) {
+        return false;
+      }
+    }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+
+    // create table segment files again to test default retention.
+    createTableAndSegmentFilesWithGZExtension(tempDir, 
segmentsThatShouldBeDeleted());
+    // delete the segments with default retention
+    deletionManager.deleteSegments(TABLE_NAME, segments);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        Assert.assertEquals(tableDir.listFiles().length, 0);
+        Assert.assertEquals(deletedTableDir.listFiles().length, 
segments.size());
+        return true;
+      } catch (Throwable t) {
+        return false;
+      }
+    }, 2000L, 10_000L, "Unable to verify table deletion with retention");
+  }
+
+
   public void createTableAndSegmentFiles(File tempDir, List<String> segmentIds)
       throws Exception {
     File tableDir = new File(tempDir.getAbsolutePath() + File.separator + 
TABLE_NAME);
@@ -364,6 +437,20 @@ public class SegmentDeletionManagerTest {
     }
   }
 
+  public void createTableAndSegmentFilesWithGZExtension(File tempDir, 
List<String> segmentIds)
+      throws Exception {
+    File tableDir = new File(tempDir.getAbsolutePath() + File.separator + 
TABLE_NAME);
+    tableDir.mkdir();
+    for (String segmentId : segmentIds) {
+      createTestFileWithAge(
+          tableDir.getAbsolutePath() + File.separator + segmentId + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION, 0);
+      // Create segment metadata file
+      createTestFileWithAge(
+          tableDir.getAbsolutePath() + File.separator + segmentId + 
Constants.METADATA_TAR_GZ_FILE_EXT, 0);
+    }
+  }
+
+
   public String genDeletedSegmentName(String fileName, int age, int 
retentionInDays) {
     // adding one more hours to the deletion time just to make sure the test 
goes pass the retention period because
     // we no longer keep second level info in the date format.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to