This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e9f3ccc78bb Fixing flaky test for
SegmentDeletionManagerTest.testRemoveDeletedSegments() (#17042)
e9f3ccc78bb is described below
commit e9f3ccc78bbaed670a655a4613b5f65c513c005c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Oct 20 16:39:43 2025 -0700
Fixing flaky test for
SegmentDeletionManagerTest.testRemoveDeletedSegments() (#17042)
---
.../core/util/SegmentDeletionManagerTest.java | 111 +++++++++++++--------
1 file changed, 69 insertions(+), 42 deletions(-)
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index 2db31ac2049..e1f60398aaf 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
@@ -124,11 +123,7 @@ public class SegmentDeletionManagerTest {
for (int i = 0; i < result.length; i++) {
final String path = propStoreList.get(i);
final String segmentId = path.substring((path.lastIndexOf('/') + 1));
- if (failedSegs.indexOf(segmentId) < 0) {
- result[i] = true;
- } else {
- result[i] = false;
- }
+ result[i] = !failedSegs.contains(segmentId);
}
return result;
}
@@ -263,9 +258,9 @@ public class SegmentDeletionManagerTest {
// Test delete when there is no files but some directories exist
deletionManager.removeAgedDeletedSegments(leadControllerManager);
- Assert.assertEquals(dummyDir1.exists(), false);
- Assert.assertEquals(dummyDir2.exists(), false);
- Assert.assertEquals(dummyDir3.exists(), false);
+ Assert.assertFalse(dummyDir1.exists());
+ Assert.assertFalse(dummyDir2.exists());
+ Assert.assertFalse(dummyDir3.exists());
// Create dummy directories and files again
dummyDir1.mkdir();
@@ -304,18 +299,18 @@ public class SegmentDeletionManagerTest {
TestUtils.waitForCondition((aVoid) -> dummyDir2.exists(), 1000, 100000,
"dummyDir2 does not exist");
-
// Check that deleted file without retention suffix is honoring
cluster-wide retention period of 7 days.
TestUtils.waitForCondition((aVoid) -> dummyDir3.list().length == 1, 1000,
100000,
"Unable to delete desired segments from dummyDir3");
// Try to remove empty directory in the next run
deletionManager.removeAgedDeletedSegments(leadControllerManager);
- Assert.assertFalse(dummyDir2.exists());
+ TestUtils.waitForCondition((aVoid) -> !dummyDir2.exists(), 1000, 100000,
+ "dummyDir2 still exists");
}
@Test
- public void testRemoveDeletedSegmentsForGcsPinotFS()
+ public void testRemoveDeletedSegmentsForFakePinotFS()
throws URISyntaxException, IOException, InterruptedException {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY
+ ".class",
@@ -436,7 +431,6 @@ public class SegmentDeletionManagerTest {
}, 2000L, 10_000L, "Unable to verify table deletion with retention");
}
-
@Test
public void testSegmentDeletionLogicWithFileWithGZExtension()
throws Exception {
@@ -567,7 +561,6 @@ public class SegmentDeletionManagerTest {
}
}
-
public String genDeletedSegmentName(String fileName, int age, int
retentionInDays) {
// adding one more hours to the deletion time just to make sure the test
goes pass the retention period because
// we no longer keep second level info in the date format.
@@ -617,11 +610,14 @@ public class SegmentDeletionManagerTest {
public static class FakePinotFs extends LocalPinotFS {
- private Map<String, Set<String>> _tableDirs;
+ // SegmentDeletionManager submits deletions on a background
single-threaded executor while the
+ // test thread may call exists()/listFiles() concurrently; synchronize all
accesses to `_tableDirs`
+ // to avoid races/CMEs and ensure cross-thread visibility in this test
stub.
+ private final Map<String, Set<String>> _tableDirs = new HashMap<>();
@Override
public void init(PinotConfiguration configuration) {
- _tableDirs = new HashMap<>();
+ _tableDirs.clear();
}
@Override
@@ -629,26 +625,34 @@ public class SegmentDeletionManagerTest {
throws IOException {
// create a new table Dir if the path ends with /
if (uri.getPath().endsWith("/")) {
- _tableDirs.put(uri.getPath(), new HashSet<>());
+ synchronized (_tableDirs) {
+ _tableDirs.put(uri.getPath(), new HashSet<>());
+ }
return true;
}
// add the segment to the table dir
// we are including / in the table path to replicate responses of
GcsPinotFs
String tableName = uri.getPath().substring(0,
uri.getPath().lastIndexOf("/") + 1);
- return _tableDirs.get(tableName).add(uri.getPath());
+ synchronized (_tableDirs) {
+ Set<String> segments = _tableDirs.get(tableName);
+ return segments != null && segments.add(uri.getPath());
+ }
}
@Override
public boolean delete(URI uri, boolean forceDelete)
throws IOException {
// delete the key if it's a table
- if (_tableDirs.containsKey(uri.getPath() + "/")) {
- _tableDirs.remove(uri.getPath() + "/");
- return true;
+ synchronized (_tableDirs) {
+ if (_tableDirs.containsKey(uri.getPath() + "/")) {
+ _tableDirs.remove(uri.getPath() + "/");
+ return true;
+ }
+ // remove the segment
+ String tableName = uri.getPath().substring(0,
uri.getPath().lastIndexOf("/") + 1);
+ Set<String> segments = _tableDirs.get(tableName);
+ return segments != null && segments.remove(uri.getPath());
}
- // remove the segment
- String tableName = uri.getPath().substring(0,
uri.getPath().lastIndexOf("/") + 1);
- return _tableDirs.get(tableName).remove(uri.getPath());
}
@Override
@@ -657,13 +661,20 @@ public class SegmentDeletionManagerTest {
// the expectation here is that the batch delete call is only limited to
segments.
URI segmentURI = segmentUris.get(0);
String tableName = segmentURI.getPath().substring(0,
segmentURI.getPath().lastIndexOf("/") + 1);
- if (_tableDirs.containsKey(tableName)) {
- // remove all the segments from the table directory
- segmentUris.forEach(segmentUri ->
_tableDirs.get(tableName).remove(segmentUri.getPath()));
- return true;
+ synchronized (_tableDirs) {
+ if (_tableDirs.containsKey(tableName)) {
+ // remove all the segments from the table directory
+ Set<String> segments = _tableDirs.get(tableName);
+ for (URI segmentUri : segmentUris) {
+ if (segments != null) {
+ segments.remove(segmentUri.getPath());
+ }
+ }
+ return true;
+ }
+ // the table does not exist and we return a false;
+ return false;
}
- // the table does not exist and we return a false;
- return false;
}
@Override
@@ -672,37 +683,53 @@ public class SegmentDeletionManagerTest {
if (uriPath.endsWith("/")) {
uriPath = uriPath.substring(0, uriPath.lastIndexOf("/"));
}
- return uriPath.endsWith(SegmentDeletionManager.DELETED_SEGMENTS) ||
_tableDirs.containsKey(uriPath + "/");
+ synchronized (_tableDirs) {
+ return uriPath.endsWith(SegmentDeletionManager.DELETED_SEGMENTS) ||
_tableDirs.containsKey(uriPath + "/");
+ }
}
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
if (fileUri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS))
{
- return _tableDirs.keySet().toArray(new String[0]);
+ synchronized (_tableDirs) {
+ return _tableDirs.keySet().toArray(new String[0]);
+ }
}
// the call to list segments will come without the delimiter after the
table name
String tableName = fileUri.getPath().endsWith("/") ? fileUri.getPath() :
fileUri.getPath() + "/";
- return _tableDirs.get(tableName).toArray(new String[0]);
+ synchronized (_tableDirs) {
+ Set<String> segments = _tableDirs.get(tableName);
+ if (segments == null) {
+ return new String[0];
+ }
+ return segments.toArray(new String[0]);
+ }
}
@Override
public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean
recursive) {
- if (_tableDirs.containsKey(fileUri.getPath() + "/")) {
- return _tableDirs.get(fileUri.getPath() + "/")
- .stream()
- .map(segmentFilePath -> new
FileMetadata.Builder().setFilePath(segmentFilePath).build())
- .collect(Collectors.toList());
+ synchronized (_tableDirs) {
+ if (_tableDirs.containsKey(fileUri.getPath() + "/")) {
+ Set<String> segments = _tableDirs.get(fileUri.getPath() + "/");
+ List<FileMetadata> result = new ArrayList<>(segments.size());
+ for (String segmentFilePath : segments) {
+ result.add(new
FileMetadata.Builder().setFilePath(segmentFilePath).build());
+ }
+ return result;
+ }
+ return List.of();
}
- return List.of();
}
@Override
public boolean isDirectory(URI uri) {
- if (_tableDirs.containsKey(uri.getPath() + "/")) {
- return true;
+ synchronized (_tableDirs) {
+ if (_tableDirs.containsKey(uri.getPath() + "/")) {
+ return true;
+ }
+ return uri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS);
}
- return uri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]