This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7f123000ca3 Batch delete deleted directory segments (#16848)
7f123000ca3 is described below
commit 7f123000ca36a95fdd5a2203d3f45473a19cf9f3
Author: 9aman <[email protected]>
AuthorDate: Fri Sep 26 15:56:32 2025 +0530
Batch delete deleted directory segments (#16848)
* Batch delete deleted directory segments
* Changes to ensure async deletion
* Fixing unit tests
* Improve and fix test cases
* Minor improvements in testing
* Renaming variable to correctly capture the logic
* Add support for providing a custom batch size for aged deleted segments
deletion
* Fixing the harcoded string to a constant
* Fixing failes test cases
* Increase the default deletion size to 1000 from 100
* Add timeout for batched deletion.
---
.../apache/pinot/controller/ControllerConf.java | 12 ++
.../helix/core/SegmentDeletionManager.java | 196 ++++++++++-----------
.../helix/core/retention/RetentionManager.java | 5 +-
.../helix/core/retention/RetentionManagerTest.java | 20 ++-
.../core/util/SegmentDeletionManagerTest.java | 116 +++++++++---
5 files changed, 213 insertions(+), 136 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8e0583ddee1..7a30e12e4e2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -274,6 +274,9 @@ public class ControllerConf extends PinotConfiguration {
public static final String UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS =
"controller.retentionManager.untrackedSegmentsRetentionTimeInDays";
public static final int DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS
= 3;
+ public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
+ "controller.retentionManager.agedSegmentsDeletionBatchSize";
+ public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND =
60 * 60; // 1 Hour.
@@ -1237,6 +1240,15 @@ public class ControllerConf extends PinotConfiguration {
setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION,
untrackedSegmentDeletionEnabled);
}
+ public int getAgedSegmentsDeletionBatchSize() {
+ return
getProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE,
+ ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
+ }
+
+ public void setAgedSegmentsDeletionBatchSize(int
agedSegmentsDeletionBatchSize) {
+ setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE,
agedSegmentsDeletionBatchSize);
+ }
+
public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 5638ebc16ce..e72ac521cec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -54,6 +54,7 @@ import
org.apache.pinot.core.segment.processing.lifecycle.impl.SegmentDeletionEv
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -71,14 +72,15 @@ public class SegmentDeletionManager {
// Retention date format will be written as suffix to deleted segments under
`Deleted_Segments` folder. for example:
//
`Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200`
to indicate that this segment
// file will be permanently deleted after Feb 2nd 2022 12PM.
- private static final String DELETED_SEGMENTS = "Deleted_Segments";
+ public static final String DELETED_SEGMENTS = "Deleted_Segments";
private static final String RETENTION_UNTIL_SEPARATOR =
"__RETENTION_UNTIL__";
private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
private static final SimpleDateFormat RETENTION_DATE_FORMAT;
private static final String DELIMITER = "/";
+ public static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 1000;
private static final int OBJECT_DELETION_TIMEOUT = 5;
- private static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 100;
+ private static final int MAX_BATCH_DELETION_TIMEOUT_SECONDS = 600; // 10
minutes
static {
RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
@@ -210,17 +212,6 @@ public class SegmentDeletionManager {
}
}
- public void removeSegmentsFromStore(String tableNameWithType, List<String>
segments) {
- removeSegmentsFromStore(tableNameWithType, segments, null);
- }
-
- public void removeSegmentsFromStore(String tableNameWithType, List<String>
segments,
- @Nullable Long deletedSegmentsRetentionMs) {
- for (String segment : segments) {
- removeSegmentFromStore(tableNameWithType, segment,
deletedSegmentsRetentionMs);
- }
- }
-
public void removeSegmentsFromStoreInBatch(String tableNameWithType,
List<String> segments,
@Nullable Long deletedSegmentsRetentionMs) {
if (_dataDir == null) {
@@ -270,49 +261,6 @@ public class SegmentDeletionManager {
}
}
- private void deleteSegmentMetadataFromStore(PinotFS pinotFS, URI
segmentFileUri, String segmentId) {
- // Check if segment metadata exists in remote store and delete it.
- // URI is generated from segment's location and segment name
- try {
- URI segmentMetadataUri =
SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(),
segmentId);
- if (pinotFS.exists(segmentMetadataUri)) {
- LOGGER.info("Deleting segment metadata: {} from: {}", segmentId,
segmentMetadataUri);
- if (!deleteWithTimeout(pinotFS, segmentMetadataUri, true,
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
- LOGGER.warn("Could not delete segment metadata: {} from: {}",
segmentId, segmentMetadataUri);
- }
- }
- } catch (IOException e) {
- LOGGER.warn("Could not delete segment metadata {} from {}", segmentId,
segmentFileUri, e);
- } catch (URISyntaxException e) {
- LOGGER.warn("Could not parse segment uri {}", segmentFileUri, e);
- }
- }
-
- protected void removeSegmentFromStore(String tableNameWithType, String
segmentId,
- @Nullable Long deletedSegmentsRetentionMs) {
- if (_dataDir != null) {
- long retentionMs = deletedSegmentsRetentionMs == null
- ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- URI fileToDeleteURI = getFileToDeleteURI(rawTableName, segmentId);
- if (fileToDeleteURI == null) {
- return;
- }
- PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
- // Segment metadata in remote store is an optimization, to avoid
downloading segment to parse metadata.
- // This is catch all clean up to ensure that metadata is removed from
deep store.
- deleteSegmentMetadataFromStore(pinotFS, fileToDeleteURI, segmentId);
- if (retentionMs <= 0) {
- // delete the segment file instantly if retention is set to zero
- segmentDeletion(segmentId, pinotFS, fileToDeleteURI);
- } else {
- moveSegmentsToDeletedDir(segmentId, deletedSegmentsRetentionMs,
rawTableName, pinotFS, fileToDeleteURI);
- }
- } else {
- LOGGER.info("dataDir is not configured, won't delete segment {} from
disk", segmentId);
- }
- }
-
private void moveSegmentsToDeletedDir(String segmentId, Long
deletedSegmentsRetentionMs, String rawTableName,
PinotFS pinotFS,
URI fileToDeleteURI) {
@@ -342,13 +290,6 @@ public class SegmentDeletionManager {
}
}
- private static void segmentDeletion(String segmentId, PinotFS pinotFS, URI
fileToDeleteURI) {
- if (deleteWithTimeout(pinotFS, fileToDeleteURI, true,
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
- LOGGER.info("Deleted segment: {} from: {}", segmentId,
fileToDeleteURI.toString());
- } else {
- LOGGER.warn("Failed to delete segment: {} from: {}", segmentId,
fileToDeleteURI.toString());
- }
- }
/**
* Retrieves the URI for segment deletion by checking two possible segment
file variants in deep store.
@@ -386,11 +327,26 @@ public class SegmentDeletionManager {
return null;
}
}
-
/**
- * Removes aged deleted segments from the deleted directory
+ * Removes aged deleted segments from the deleted directory using the
default batch size.
+ * This method processes aged segments in batches to avoid overwhelming the
system.
+ *
+ * @param leadControllerManager the lead controller manager to check if this
controller is the leader for tables
*/
public void removeAgedDeletedSegments(LeadControllerManager
leadControllerManager) {
+ removeAgedDeletedSegments(leadControllerManager,
NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT);
+ }
+
+ /**
+ * Removes aged deleted segments from the deleted directory with a custom
batch size.
+ * This method asynchronously deletes segments that have exceeded their
retention period.
+ * Only the leader controller for each table will perform the deletion to
avoid conflicts.
+ *
+ * @param leadControllerManager the lead controller manager to check if this
controller is the leader for tables
+ * @param agedSegmentsDeletionBatchSize the maximum number of aged segments
to process in a single batch
+ */
+ public void removeAgedDeletedSegments(LeadControllerManager
leadControllerManager,
+ int agedSegmentsDeletionBatchSize) {
if (_dataDir != null) {
URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
@@ -425,37 +381,74 @@ public class SegmentDeletionManager {
if (leadControllerManager.isLeaderForTable(tableName)) {
URI tableNameURI = URIUtils.getUri(deletedDirURI.toString(),
URIUtils.encode(tableName));
// Get files that are aged
- final String[] targetFiles = pinotFS.listFiles(tableNameURI,
false);
- int numFilesDeleted = 0;
- URI targetURI = null;
- for (String targetFile : targetFiles) {
+ final List<FileMetadata> targetFiles =
pinotFS.listFilesWithMetadata(tableNameURI, false);
+
+ if (targetFiles.isEmpty()) {
+ LOGGER.info("Deleting empty deleted segments directory: {} for
table: {}", tableNameURI, tableName);
try {
- targetURI =
- URIUtils.getUri(tableNameURI.toString(),
URIUtils.encode(URIUtils.getLastPart(targetFile)));
- long deletionTimeMs =
getDeletionTimeMsFromFile(targetURI.toString(),
pinotFS.lastModified(targetURI));
- if (System.currentTimeMillis() >= deletionTimeMs) {
- if (!deleteWithTimeout(pinotFS, targetURI, true,
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
- LOGGER.warn("Failed to remove resource: {}", targetURI);
- } else {
- numFilesDeleted++;
- if (numFilesDeleted >=
NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT) {
- LOGGER.info("Reached threshold of max aged segments to
delete per attempt. Deleted: {} files "
- + "from directory: {}", numFilesDeleted,
tableNameDir);
- break;
- }
- }
+ if (!pinotFS.delete(tableNameURI, false)) {
+ LOGGER.info("Could not delete deleted segments directory: {}
for table: {}", tableNameURI, tableName);
+ } else {
+ LOGGER.info("Successfully deleted deleted segments
directory: {} for table: {}", tableNameURI,
+ tableName);
}
} catch (Exception e) {
- LOGGER.warn("Failed to delete uri: {} for table: {}",
targetURI, tableName, e);
+ LOGGER.error("Exception occurred while deleting deleted
segments directory: {} for table: {}",
+ tableNameURI, tableName, e);
}
+ continue;
}
+ int numFilesScheduledForDeletion = 0;
+ URI targetURI = null;
+ List<URI> targetURIs = new ArrayList<>();
+ for (FileMetadata targetFile : targetFiles) {
+ // Some file system implementations also return the current
table directory
+ // we do not want to delete the table directory
+ if (targetFile.isDirectory()) {
+ continue;
+ }
- if (numFilesDeleted == targetFiles.length) {
- // Delete directory if it's empty
- if (!deleteWithTimeout(pinotFS, tableNameURI, false,
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
- LOGGER.warn("Failed to remove the directory: {}",
tableNameDir);
+ targetURI =
+ URIUtils.getUri(tableNameURI.toString(),
+
URIUtils.encode(URIUtils.getLastPart(targetFile.getFilePath())));
+ long deletionTimeMs =
getDeletionTimeMsFromFile(targetURI.toString(),
targetFile.getLastModifiedTime());
+ if (System.currentTimeMillis() >= deletionTimeMs) {
+ numFilesScheduledForDeletion++;
+ targetURIs.add(targetURI);
+ if (numFilesScheduledForDeletion ==
agedSegmentsDeletionBatchSize) {
+ LOGGER.info(
+ "Reached threshold of max aged segments to schedule for
deletion per attempt. Scheduling "
+ + "deletion of: {} segment files "
+ + "from directory: {}",
numFilesScheduledForDeletion, tableNameDir);
+ break;
+ }
}
}
+ try {
+ if (numFilesScheduledForDeletion > 0) {
+ LOGGER.info("Submitting request to delete: {} segment files
from directory: {}",
+ numFilesScheduledForDeletion, tableNameDir);
+ _executorService.submit(() -> {
+ try {
+ long timeoutSeconds =
Math.min(MAX_BATCH_DELETION_TIMEOUT_SECONDS,
+ (long) OBJECT_DELETION_TIMEOUT * targetURIs.size());
+ if (!deleteBatchWithTimeout(pinotFS, targetURIs, false,
timeoutSeconds, TimeUnit.SECONDS)) {
+ LOGGER.warn("Failed to delete aged segment files from
table: {}", tableName);
+ } else {
+ LOGGER.info("Successfully deleted {} aged segment files
from table: {}",
+ targetURIs.size(),
+ tableName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred while deleting aged
segments for table: {} from path: {}",
+ tableName, tableNameURI, e);
+ }
+ });
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to submit deletion task for segments in
table: {} from path: {}", tableName,
+ tableNameURI, e);
+ }
}
}
} catch (IOException e) {
@@ -466,32 +459,33 @@ public class SegmentDeletionManager {
}
}
- private static boolean deleteWithTimeout(PinotFS pinotFS, URI targetURI,
boolean forceDelete, long timeout,
- TimeUnit timeUnit) {
+ private static boolean deleteBatchWithTimeout(PinotFS pinotFS, List<URI>
targetURIs, boolean forceDelete,
+ long timeout, TimeUnit timeUnit) {
CompletableFuture<Boolean> deleteFuture = CompletableFuture.supplyAsync(()
-> {
- try {
- return pinotFS.delete(targetURI, forceDelete);
- } catch (IOException e) {
- LOGGER.warn("Error while deleting resource: {}", targetURI, e);
- return false;
- }
- });
-
+ try {
+ return pinotFS.deleteBatch(targetURIs, forceDelete);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to batch delete segments files", e);
+ return false;
+ }
+ }
+ );
try {
return deleteFuture.get(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.warn("Thread was interrupted while deleting resource: {}",
targetURI, e);
+ LOGGER.warn("Thread was interrupted while deleting resources", e);
return false;
} catch (TimeoutException e) {
- LOGGER.warn("Timeout occurred while deleting resource: {}", targetURI,
e);
+ LOGGER.warn("Timeout occurred while deleting resource", e);
return false;
} catch (ExecutionException e) {
- LOGGER.warn("Exception occurred while deleting resource: {}", targetURI,
e);
+ LOGGER.warn("Exception occurred while deleting resource", e);
return false;
}
}
+
private String getDeletedSegmentFileName(String fileName, long
deletedSegmentsRetentionMs) {
return fileName + RETENTION_UNTIL_SEPARATOR +
RETENTION_DATE_FORMAT.format(new Date(
System.currentTimeMillis() + deletedSegmentsRetentionMs));
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9dd9fe11c73..3d1bd43e418 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -81,6 +81,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
private static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
private final boolean _untrackedSegmentDeletionEnabled;
private final int _untrackedSegmentsRetentionTimeInDays;
+ private final int _agedSegmentsDeletionBatchSize;
private static final Logger LOGGER =
LoggerFactory.getLogger(RetentionManager.class);
private final boolean _isHybridTableRetentionStrategyEnabled;
@@ -94,6 +95,7 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
controllerMetrics);
_untrackedSegmentDeletionEnabled =
config.getUntrackedSegmentDeletionEnabled();
_untrackedSegmentsRetentionTimeInDays =
config.getUntrackedSegmentsRetentionTimeInDays();
+ _agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize();
_isHybridTableRetentionStrategyEnabled =
config.isHybridTableRetentionStrategyEnabled();
_brokerServiceHelper = brokerServiceHelper;
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}",
getIntervalInSeconds());
@@ -120,7 +122,8 @@ public class RetentionManager extends
ControllerPeriodicTask<Void> {
@Override
protected void postprocess() {
LOGGER.info("Removing aged deleted segments for all tables");
-
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_leadControllerManager);
+ _pinotHelixResourceManager.getSegmentDeletionManager()
+ .removeAgedDeletedSegments(_leadControllerManager,
_agedSegmentsDeletionBatchSize);
}
private void manageRetentionForTable(TableConfig tableConfig) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d5a1f29849d..ebe829231a0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -210,7 +210,8 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is called
- verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
+ verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
// Verify deleteSegments is called
verify(pinotHelixResourceManager,
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
@@ -410,7 +411,8 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is called
- verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
+ verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
// Verify deleteSegments is called
verify(pinotHelixResourceManager,
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
@@ -523,7 +525,9 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
mock(SegmentDeletionManager.class);
// Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we
only test that the call is made once per
// run of the retention manager
- doAnswer(invocationOnMock ->
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+ doAnswer(invocationOnMock -> null).when(deletionManager)
+ .removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
// If and when PinotHelixResourceManager.deleteSegments() is invoked, make
sure that the segments deleted
@@ -552,7 +556,9 @@ public class RetentionManagerTest {
when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
SegmentDeletionManager deletionManager =
mock(SegmentDeletionManager.class);
- doAnswer(invocationOnMock ->
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+ doAnswer(invocationOnMock -> null).when(deletionManager)
+ .removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
// Set up verification for deleteSegments with focus on the count and
segment inclusion rules
@@ -615,7 +621,8 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is actually called.
- verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
+ verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(),
anyList());
@@ -652,7 +659,8 @@ public class RetentionManagerTest {
SegmentDeletionManager deletionManager =
pinotHelixResourceManager.getSegmentDeletionManager();
// Verify that the removeAgedDeletedSegments() method in deletion manager
is actually called.
- verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager);
+ verify(deletionManager,
times(1)).removeAgedDeletedSegments(leadControllerManager,
+
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(),
anyList());
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index 3dcac7b01ea..2db31ac2049 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
@@ -48,6 +49,7 @@ import
org.apache.pinot.controller.helix.core.SegmentDeletionManager;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -244,7 +246,7 @@ public class SegmentDeletionManagerTest {
deletionManager.removeAgedDeletedSegments(leadControllerManager);
// Create deleted directory
- String deletedDirectoryPath = tempDir + File.separator +
"Deleted_Segments";
+ String deletedDirectoryPath = tempDir + File.separator +
SegmentDeletionManager.DELETED_SEGMENTS;
File deletedDirectory = new File(deletedDirectoryPath);
deletedDirectory.mkdir();
@@ -295,18 +297,26 @@ public class SegmentDeletionManagerTest {
deletionManager.removeAgedDeletedSegments(leadControllerManager);
// Check that only 1 day retention file is remaining
- Assert.assertEquals(dummyDir1.list().length, 1);
+ TestUtils.waitForCondition((aVoid) -> dummyDir1.list().length == 1, 1000,
100000,
+ "Unable to delete desired segments from dummyDir1");
+
+ // Check that empty directory has not been removed in the first run
+ TestUtils.waitForCondition((aVoid) -> dummyDir2.exists(), 1000, 100000,
+ "dummyDir2 does not exist");
- // Check that empty directory has successfully been removed.
- Assert.assertEquals(dummyDir2.exists(), false);
// Check that deleted file without retention suffix is honoring
cluster-wide retention period of 7 days.
- Assert.assertEquals(dummyDir3.list().length, 1);
+ TestUtils.waitForCondition((aVoid) -> dummyDir3.list().length == 1, 1000,
100000,
+ "Unable to delete desired segments from dummyDir3");
+
+ // Try to remove empty directory in the next run
+ deletionManager.removeAgedDeletedSegments(leadControllerManager);
+ Assert.assertFalse(dummyDir2.exists());
}
@Test
public void testRemoveDeletedSegmentsForGcsPinotFS()
- throws URISyntaxException, IOException {
+ throws URISyntaxException, IOException, InterruptedException {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY
+ ".class",
LocalPinotFS.class.getName());
@@ -323,9 +333,9 @@ public class SegmentDeletionManagerTest {
PinotFSFactory.register("fake", FakePinotFs.class.getName(), null);
PinotFS pinotFS = PinotFSFactory.create("fake");
- URI tableUri1 = new
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_1/");
+ URI tableUri1 = new URI("fake://bucket/sc/managed/pinot/" +
SegmentDeletionManager.DELETED_SEGMENTS + "/table_1/");
pinotFS.mkdir(tableUri1);
- for (int i = 0; i < 101; i++) {
+ for (int i = 0; i <
SegmentDeletionManager.NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT + 1; i++) {
URI segmentURIForTable =
new URI(tableUri1.getPath() + "segment" + i +
RETENTION_UNTIL_SEPARATOR + "201901010000");
pinotFS.mkdir(segmentURIForTable);
@@ -335,7 +345,7 @@ public class SegmentDeletionManagerTest {
pinotFS.mkdir(segmentURIForTable);
// Create dummy files
- URI tableUri2 = new
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_2/");
+ URI tableUri2 = new URI("fake://bucket/sc/managed/pinot/" +
SegmentDeletionManager.DELETED_SEGMENTS + "/table_2/");
URI segment1ForTable2 = new URI(tableUri2.getPath() + "segment1" +
RETENTION_UNTIL_SEPARATOR + "201901010000");
URI segment2ForTable2 = new URI(tableUri2.getPath() + "segment1" +
RETENTION_UNTIL_SEPARATOR + "201801010000");
pinotFS.mkdir(tableUri2);
@@ -343,11 +353,31 @@ public class SegmentDeletionManagerTest {
pinotFS.mkdir(segment2ForTable2);
deletionManager1.removeAgedDeletedSegments(leadControllerManager);
- // all files should get deleted
- Assert.assertFalse(pinotFS.exists(tableUri2));
+
+ // All files should get deleted but the directory will be deleted in the
next run
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ return pinotFS.listFiles(tableUri2, false).length == 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 10000,
+ "Could not delete all the files for table_2");
+ Assert.assertTrue(pinotFS.exists(tableUri2));
// One file that doesn't meet retention criteria, and another file due to
the per attempt batch limit remains.
- Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 2);
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ return pinotFS.listFiles(tableUri1, false).length == 2;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 10000,
+ "100 out of 102 files could not be deleted from tableUri1 directory");
+
+ // the next run of the deletion manager should remove the empty directory
as well.
+ deletionManager1.removeAgedDeletedSegments(leadControllerManager);
+ Assert.assertFalse(pinotFS.exists(tableUri2));
}
@Test
@@ -369,8 +399,9 @@ public class SegmentDeletionManagerTest {
Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
final File tableDir = new File(tempDir.getAbsolutePath() + File.separator
+ TABLE_NAME);
- final File deletedTableDir = new File(tempDir.getAbsolutePath() +
File.separator + "Deleted_Segments"
- + File.separator + TABLE_NAME);
+ final File deletedTableDir =
+ new File(tempDir.getAbsolutePath() + File.separator +
SegmentDeletionManager.DELETED_SEGMENTS
+ + File.separator + TABLE_NAME);
// delete the segments instantly.
SegmentsValidationAndRetentionConfig mockValidationConfig =
mock(SegmentsValidationAndRetentionConfig.class);
@@ -425,8 +456,9 @@ public class SegmentDeletionManagerTest {
Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
createTableAndSegmentFilesWithGZExtension(tempDir,
segmentsThatShouldBeDeleted());
final File tableDir = new File(tempDir.getAbsolutePath() + File.separator
+ TABLE_NAME);
- final File deletedTableDir = new File(tempDir.getAbsolutePath() +
File.separator + "Deleted_Segments"
- + File.separator + TABLE_NAME);
+ final File deletedTableDir =
+ new File(tempDir.getAbsolutePath() + File.separator +
SegmentDeletionManager.DELETED_SEGMENTS
+ + File.separator + TABLE_NAME);
// mock returning ZK Metadata for segment url
ZNRecord znRecord1 = mock(org.apache.helix.ZNRecord.class);
@@ -493,8 +525,9 @@ public class SegmentDeletionManagerTest {
List<String> segmentsThatShouldBeDeleted = segmentsThatShouldBeDeleted();
createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted);
final File tableDir = new File(tempDir.getAbsolutePath() + File.separator
+ TABLE_NAME);
- final File deletedTableDir = new File(tempDir.getAbsolutePath() +
File.separator + "Deleted_Segments"
- + File.separator + TABLE_NAME);
+ final File deletedTableDir =
+ new File(tempDir.getAbsolutePath() + File.separator +
SegmentDeletionManager.DELETED_SEGMENTS
+ + File.separator + TABLE_NAME);
deletionManager.removeSegmentsFromStoreInBatch(TABLE_NAME,
segmentsThatShouldBeDeleted, 0L);
@@ -569,12 +602,6 @@ public class SegmentDeletionManagerTest {
super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L,
0L);
}
- @Override
- protected void removeSegmentFromStore(String tableName, String segmentId,
- @Nullable Long deletedSegmentsRetentionMs) {
- _segmentsRemovedFromStore.add(segmentId);
- }
-
@Override
protected void deleteSegmentsWithDelay(String tableName,
Collection<String> segmentIds,
@Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
@@ -619,20 +646,39 @@ public class SegmentDeletionManagerTest {
_tableDirs.remove(uri.getPath() + "/");
return true;
}
- // remote the segment
+ // remove the segment
String tableName = uri.getPath().substring(0,
uri.getPath().lastIndexOf("/") + 1);
return _tableDirs.get(tableName).remove(uri.getPath());
}
+ @Override
+ public boolean deleteBatch(List<URI> segmentUris, boolean forceDelete)
+ throws IOException {
+ // the expectation here is that the batch delete call is only limited to
segments.
+ URI segmentURI = segmentUris.get(0);
+ String tableName = segmentURI.getPath().substring(0,
segmentURI.getPath().lastIndexOf("/") + 1);
+ if (_tableDirs.containsKey(tableName)) {
+ // remove all the segments from the table directory
+ segmentUris.forEach(segmentUri ->
_tableDirs.get(tableName).remove(segmentUri.getPath()));
+ return true;
+ }
+ // the table does not exist and we return a false;
+ return false;
+ }
+
@Override
public boolean exists(URI fileUri) {
- return fileUri.getPath().endsWith("Deleted_Segments") ||
_tableDirs.containsKey(fileUri.getPath() + "/");
+ String uriPath = fileUri.getPath();
+ if (uriPath.endsWith("/")) {
+ uriPath = uriPath.substring(0, uriPath.lastIndexOf("/"));
+ }
+ return uriPath.endsWith(SegmentDeletionManager.DELETED_SEGMENTS) ||
_tableDirs.containsKey(uriPath + "/");
}
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
- if (fileUri.getPath().endsWith("Deleted_Segments")) {
+ if (fileUri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS))
{
return _tableDirs.keySet().toArray(new String[0]);
}
// the call to list segments will come without the delimiter after the
table name
@@ -640,9 +686,23 @@ public class SegmentDeletionManagerTest {
return _tableDirs.get(tableName).toArray(new String[0]);
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean
recursive) {
+ if (_tableDirs.containsKey(fileUri.getPath() + "/")) {
+ return _tableDirs.get(fileUri.getPath() + "/")
+ .stream()
+ .map(segmentFilePath -> new
FileMetadata.Builder().setFilePath(segmentFilePath).build())
+ .collect(Collectors.toList());
+ }
+ return List.of();
+ }
+
@Override
public boolean isDirectory(URI uri) {
- return true;
+ if (_tableDirs.containsKey(uri.getPath() + "/")) {
+ return true;
+ }
+ return uri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]