This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 90fc9285b3 Ensure SegmentDeletionManager deletes deepstore files created by BaseMultipleSegmentsConversionExecutor (#15048) 90fc9285b3 is described below commit 90fc9285b3e34e231c6ba3574a4b414cf7596ac4 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Fri Feb 14 11:01:14 2025 +0530 Ensure SegmentDeletionManager deletes deepstore files created by BaseMultipleSegmentsConversionExecutor (#15048) * Rely on Segment ZK download url for deleting segment in the deletion manager and use the existing naming scheme only as a backup * Fixing integration tests * Changing the approach to using fixed extension instead of relying on reading segment ZK metadata * Minor improvements in logging * add logs * Improving java docs * Fix java docs and minor logs --- .../helix/core/SegmentDeletionManager.java | 44 +++++++++- .../pinot/controller/api/TableViewsTest.java | 9 +- .../PinotHelixResourceManagerStatelessTest.java | 98 +++++++++++++++------- .../core/util/SegmentDeletionManagerTest.java | 87 +++++++++++++++++++ 4 files changed, 205 insertions(+), 33 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 5a7012657e..d1d5544be7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -43,6 +43,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager; @@ -235,7 +236,11 @@ public class SegmentDeletionManager { long retentionMs = deletedSegmentsRetentionMs == null ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs; String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); + URI fileToDeleteURI = getFileToDeleteURI(rawTableName, segmentId); + if (fileToDeleteURI == null) { + LOGGER.warn("No segment file found for segment: {} in deep store, skipping deletion", segmentId); + return; + } PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme()); // Segment metadata in remote store is an optimization, to avoid downloading segment to parse metadata. // This is catch all clean up to ensure that metadata is removed from deep store. @@ -282,6 +287,43 @@ public class SegmentDeletionManager { } } + /** + * Retrieves the URI for segment deletion by checking two possible segment file variants in deep store. + * Looks for the segment file in two formats: + * - Without extension (conventional naming) + * - With .tar.gz extension (used by minions in BaseMultipleSegmentsConversionExecutor) + * + * @param rawTableName name of the table containing the segment + * @param segmentId name of the segment + * @return URI of the existing segment file if found in either format, null if segment doesn't exist in either format + * or if there are filesystem access errors + */ + @Nullable + private URI getFileToDeleteURI(String rawTableName, String segmentId) { + try { + URI plainFileUri = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); + PinotFS pinotFS = PinotFSFactory.create(plainFileUri.getScheme()); + + // Check for plain segment file first + if (pinotFS.exists(plainFileUri)) { + return plainFileUri; + } + + URI tarGzFileUri = URIUtils.getUri(_dataDir, rawTableName, + URIUtils.encode(segmentId + TarCompressionUtils.TAR_GZ_FILE_EXTENSION)); + + // Check for .tar.gz segment file + if (pinotFS.exists(tarGzFileUri)) { + return tarGzFileUri; + } + LOGGER.error("No file found for segment: {} in deep store", segmentId); + return null; + } catch (Exception e) { + LOGGER.error("Caught exception while trying to find file for segment: {} in deep store", segmentId); + return null; + } + } + /** * Removes aged deleted segments from the deleted directory */ diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java index 639542eab1..7df080df26 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java @@ -22,6 +22,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.Map; import org.apache.helix.InstanceType; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.api.resources.TableViews; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; @@ -63,7 +64,9 @@ public class TableViewsTest extends ControllerTest { DEFAULT_INSTANCE.getHelixResourceManager().addTable(tableConfig); DEFAULT_INSTANCE.getHelixResourceManager() .addNewSegment(TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME), - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME), + getDownloadURL(DEFAULT_INSTANCE.getHelixResourceManager().getDataDir(), OFFLINE_TABLE_NAME, + OFFLINE_SEGMENT_NAME)); // Create the hybrid table DEFAULT_INSTANCE.addDummySchema(HYBRID_TABLE_NAME); @@ -168,6 +171,10 @@ public class TableViewsTest extends ControllerTest { TableViews.TableView.class); } + private String getDownloadURL(String controllerDataDir, String rawTableName, String segmentId) { + return URIUtils.getUri(controllerDataDir, rawTableName, URIUtils.encode(segmentId)).toString(); + } + @AfterClass public void tearDown() { DEFAULT_INSTANCE.cleanup(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 095865d923..dda542d3d2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -50,6 +50,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -983,9 +984,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { new EndReplaceSegmentsRequest(Arrays.asList("s9", "s6"), null))); // Try after new segments added to the table _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s20"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s20"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s20")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId, new EndReplaceSegmentsRequest(Arrays.asList("s21"), null)); SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); @@ -1012,7 +1015,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add 5 segments for (int i = 0; i < 5; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 5); @@ -1057,9 +1061,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Try after new segments added to the table _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s5"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s5"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s5")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId1, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); assertEquals(segmentLineage.getLineageEntryIds(), Collections.singleton(lineageEntryId1)); @@ -1080,7 +1086,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t1_0"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t1_0"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t1_0")); IdealState idealState = _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME); assertNotNull(idealState); assertTrue(idealState.getPartitionSet().contains("merged_t1_0")); @@ -1112,7 +1119,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t2_0"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t2_0"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t2_0")); // Without force cleanup, 'startReplaceSegments' again should fail because of duplicate segments on 'segmentFrom' List<String> segmentsFrom4 = Arrays.asList("s1", "s2"); @@ -1140,9 +1148,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload segments again _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t3_0"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t3_0"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_0")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t3_1"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_t3_1"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "merged_t3_1")); // Finish the replacement _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId4, null); @@ -1181,7 +1191,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s7"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s7"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s7")); // Start another new segment replacement with empty segmentsFrom, and check that previous lineages with empty // segmentsFrom are not reverted @@ -1196,9 +1207,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Finish the replacement _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s9"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s9"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s9")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s10"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s10"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s10")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId7, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); assertEquals(segmentLineage.getLineageEntryIds().size(), 6); @@ -1221,7 +1234,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s11"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s11"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s11")); // Start another new segment replacement with segmentsFrom overlapping with previous lineage, and check that // previous lineages with overlapped segmentsFrom are reverted @@ -1236,9 +1250,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Finish the replacement _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s13"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s13"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s13")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s14"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s14"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s14")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId9, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); assertEquals(segmentLineage.getLineageEntryIds().size(), 8); @@ -1286,7 +1302,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add 3 segments for (int i = 0; i < 3; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } List<String> segmentsForTable = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false); assertEquals(segmentsForTable.size(), 3); @@ -1306,7 +1323,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add new segments for (int i = 3; i < 6; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false), "s0", "s1", "s2", "s3", "s4", "s5"); @@ -1342,7 +1360,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add partial segments to indicate incomplete protocol _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s6"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s6")); assertEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), 7); assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true), "s3", "s4", "s5"); @@ -1375,7 +1394,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Add new segments for (int i = 9; i < 12; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } // Call end segment replacements @@ -1396,7 +1416,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Re-upload (s9, s10, s11) to test the segment clean up from startReplaceSegments for (int i = 9; i < 12; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } assertSetEquals(_helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, false), "s3", "s4", "s5", "s9", "s10", "s11"); @@ -1411,7 +1432,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload the new segments (s12, s13, s14) for (int i = 12; i < 15; i++) { _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s" + i), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s" + i)); } // Call endReplaceSegments to start to use (s12, s13, s14) @@ -1433,7 +1455,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s15"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s15"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s15")); // Start another new segment replacement with empty segmentsFrom, and check that previous lineages with empty // segmentsFrom are not reverted @@ -1447,9 +1470,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Finish the replacement _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s17"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s17"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s17")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s18"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s18"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s18")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId6, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(), segmentsFrom6); @@ -1469,7 +1494,8 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload partial data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s19"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s19"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s19")); // Start another new segment replacement with segmentsFrom overlapping with previous lineage, and check that // previous lineages with overlapped segmentsFrom are reverted @@ -1483,9 +1509,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Finish the replacement _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s21"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s21")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s22"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s22"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s22")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId8, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); @@ -1506,9 +1534,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Upload data _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s23"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s23"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s23")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s24"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s24"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24")); // Start another new segment replacement with segmentsTo overlapping with previous lineage, and check that previous // lineages with overlapped segmentsTo are reverted @@ -1526,9 +1556,11 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { // Finish the replacement _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s24"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s24"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s24")); _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s25"), "downloadUrl"); + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "s25"), + getDownloadURL(_controllerDataDir, RAW_TABLE_NAME, "s25")); _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId10, null); segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_TABLE_NAME); assertEquals(segmentLineage.getLineageEntry(lineageEntryId10).getSegmentsFrom(), segmentsFrom10); @@ -1541,6 +1573,10 @@ public class PinotHelixResourceManagerStatelessTest extends ControllerTest { assertNull(segmentLineage); } + private String getDownloadURL(String controllerDataDir, String rawTableName, String segmentId) { + return URIUtils.getUri(controllerDataDir, rawTableName, URIUtils.encode(segmentId)).toString(); + } + private static void assertSetEquals(Collection<String> actual, String... expected) { Set<String> actualSet = actual instanceof Set ? (Set<String>) actual : new HashSet<>(actual); assertEquals(actualSet, new HashSet<>(Arrays.asList(expected))); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java index e600643934..01cfa1ae76 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java @@ -33,11 +33,13 @@ import java.util.TimeZone; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -54,6 +56,7 @@ import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.pinot.common.metadata.ZKMetadataProvider.constructPropertyStorePathForSegment; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -352,6 +355,76 @@ public class SegmentDeletionManagerTest { }, 2000L, 10_000L, "Unable to verify table deletion with retention"); } + + @Test + public void testSegmentDeletionLogicWithFileWithGZExtension() + throws Exception { + Map<String, Object> properties = new HashMap<>(); + properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class", + LocalPinotFS.class.getName()); + PinotFSFactory.init(new PinotConfiguration(properties)); + + HelixAdmin helixAdmin = makeHelixAdmin(); + ZkHelixPropertyStore<ZNRecord> propertyStore = makePropertyStore(); + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + SegmentDeletionManager deletionManager = new SegmentDeletionManager( + tempDir.getAbsolutePath(), helixAdmin, CLUSTER_NAME, propertyStore, 7); + + // create table segment files. + Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted()); + createTableAndSegmentFilesWithGZExtension(tempDir, segmentsThatShouldBeDeleted()); + final File tableDir = new File(tempDir.getAbsolutePath() + File.separator + TABLE_NAME); + final File deletedTableDir = new File(tempDir.getAbsolutePath() + File.separator + "Deleted_Segments" + + File.separator + TABLE_NAME); + + // mock returning ZK Metadata for segment url + ZNRecord znRecord1 = mock(org.apache.helix.ZNRecord.class); + ZNRecord znRecord2 = mock(org.apache.helix.ZNRecord.class); + ZNRecord znRecord3 = mock(org.apache.helix.ZNRecord.class); + List<ZNRecord> znRecordList = List.of(znRecord1, znRecord2, znRecord3); + for (int i = 0; i < 3; i++) { + when(znRecordList.get(i).getSimpleFields()).thenReturn(Map.of(CommonConstants.Segment.DOWNLOAD_URL, + tableDir.getAbsolutePath() + File.separator + segmentsThatShouldBeDeleted().get(i) + + TarCompressionUtils.TAR_GZ_FILE_EXTENSION)); + when(propertyStore.get(constructPropertyStorePathForSegment(TABLE_NAME, segmentsThatShouldBeDeleted().get(i)), + null, AccessOption.PERSISTENT)).thenReturn(znRecordList.get(i)); + } + + // delete the segments instantly. + SegmentsValidationAndRetentionConfig mockValidationConfig = mock(SegmentsValidationAndRetentionConfig.class); + when(mockValidationConfig.getDeletedSegmentsRetentionPeriod()).thenReturn("0d"); + TableConfig mockTableConfig = mock(TableConfig.class); + when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig); + deletionManager.deleteSegments(TABLE_NAME, segments, mockTableConfig); + + TestUtils.waitForCondition(aVoid -> { + try { + Assert.assertEquals(tableDir.listFiles().length, 0); + Assert.assertTrue(!deletedTableDir.exists() || deletedTableDir.listFiles().length == 0); + return true; + } catch (Throwable t) { + return false; + } + }, 2000L, 10_000L, "Unable to verify table deletion with retention"); + + // create table segment files again to test default retention. + createTableAndSegmentFilesWithGZExtension(tempDir, segmentsThatShouldBeDeleted()); + // delete the segments with default retention + deletionManager.deleteSegments(TABLE_NAME, segments); + + TestUtils.waitForCondition(aVoid -> { + try { + Assert.assertEquals(tableDir.listFiles().length, 0); + Assert.assertEquals(deletedTableDir.listFiles().length, segments.size()); + return true; + } catch (Throwable t) { + return false; + } + }, 2000L, 10_000L, "Unable to verify table deletion with retention"); + } + + public void createTableAndSegmentFiles(File tempDir, List<String> segmentIds) throws Exception { File tableDir = new File(tempDir.getAbsolutePath() + File.separator + TABLE_NAME); @@ -364,6 +437,20 @@ public class SegmentDeletionManagerTest { } } + public void createTableAndSegmentFilesWithGZExtension(File tempDir, List<String> segmentIds) + throws Exception { + File tableDir = new File(tempDir.getAbsolutePath() + File.separator + TABLE_NAME); + tableDir.mkdir(); + for (String segmentId : segmentIds) { + createTestFileWithAge( + tableDir.getAbsolutePath() + File.separator + segmentId + TarCompressionUtils.TAR_GZ_FILE_EXTENSION, 0); + // Create segment metadata file + createTestFileWithAge( + tableDir.getAbsolutePath() + File.separator + segmentId + Constants.METADATA_TAR_GZ_FILE_EXT, 0); + } + } + + public String genDeletedSegmentName(String fileName, int age, int retentionInDays) { // adding one more hours to the deletion time just to make sure the test goes pass the retention period because // we no longer keep second level info in the date format. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org