This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9145e92f65 Fix committing segments list (#15495) 9145e92f65 is described below commit 9145e92f655ed481d9ceac6e92ba1d0671b1d1c4 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Apr 9 21:45:42 2025 -0600 Fix committing segments list (#15495) --- .../api/resources/PinotRealtimeTableResource.java | 2 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 98 +++++++++------------- .../PinotLLCRealtimeSegmentManagerTest.java | 19 ++--- 3 files changed, 51 insertions(+), 68 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index b2d2292021..595169ce44 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -333,7 +333,7 @@ public class PinotRealtimeTableResource { @ApiResponse(code = 404, message = "Table not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public String getPauslessTableDebugInfo( + public String getPauselessTableDebugInfo( @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | " + "myTable_REALTIME") @PathParam("tableName") String realtimeTableName, @Context HttpHeaders headers) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 52bf05091f..7e93f70e67 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; import org.apache.helix.ClusterMessagingService; @@ -133,7 +134,6 @@ import org.apache.pinot.spi.utils.retry.AttemptFailureException; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.apache.zookeeper.data.Stat; -import org.codehaus.commons.nullanalysis.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2552,66 +2552,37 @@ public class PinotLLCRealtimeSegmentManager { * @param committingSegments List of new segment names that are currently in COMMITTING state. * If null, returns true without making any changes to the existing list * @return true if the synchronization succeeds, false if there's a failure in updating ZooKeeper - * @see #getCommittingSegments for the logic that filters out segments no longer in COMMITTING state */ - public boolean syncCommittingSegments(String realtimeTableName, @NotNull List<String> committingSegments) { + public boolean syncCommittingSegments(String realtimeTableName, List<String> committingSegments) { + String pauselessDebugMetadataPath = + ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); return updateCommittingSegmentsList(realtimeTableName, () -> { - String committingSegmentsListPath = - ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); - // Fetch the committing segments record from the property store. Stat stat = new Stat(); - ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); + ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, stat, AccessOption.PERSISTENT); - // empty ZN record for the table + // Create ZN record if it doesn't exist if (znRecord == null) { znRecord = new ZNRecord(realtimeTableName); znRecord.setListField(COMMITTING_SEGMENTS, committingSegments); - return _propertyStore.create(committingSegmentsListPath, znRecord, AccessOption.PERSISTENT); + return _propertyStore.create(pauselessDebugMetadataPath, znRecord, AccessOption.PERSISTENT); } - Set<String> mergedSegments = new HashSet<>(committingSegments); - // Get segments that are present in the list and are still in COMMITTING status - List<String> existingSegments = - getCommittingSegments(realtimeTableName, znRecord.getListField(COMMITTING_SEGMENTS)); - if (existingSegments != null) { - mergedSegments.addAll(existingSegments); + // Check ZK metadata again to get the latest list of committing segments + List<String> committingSegmentsFromPropertyStore = znRecord.getListField(COMMITTING_SEGMENTS); + List<String> latestCommittingSegments; + if (CollectionUtils.isEmpty(committingSegmentsFromPropertyStore)) { + latestCommittingSegments = getCommittingSegments(realtimeTableName, committingSegments); + } else { + Set<String> segmentsToCheck = new HashSet<>(committingSegments); + segmentsToCheck.addAll(committingSegmentsFromPropertyStore); + latestCommittingSegments = getCommittingSegments(realtimeTableName, segmentsToCheck); } - - znRecord.setListField(COMMITTING_SEGMENTS, new ArrayList<>(mergedSegments)); - return _propertyStore.set(committingSegmentsListPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); + znRecord.setListField(COMMITTING_SEGMENTS, latestCommittingSegments); + return _propertyStore.set(pauselessDebugMetadataPath, znRecord, stat.getVersion(), AccessOption.PERSISTENT); }); } - /** - * Filters and returns a list of committing segments for a realtime table. - * This method excludes segments that are either: - * 1. Missing from ZK metadata (likely deleted) - * 2. Already committed (status: DONE) - * - * @param realtimeTableName The name of the realtime table - * @param committingSegmentsFromPropertyStore List of segments from property store, can be null - * @return Filtered list of committing segments, or null if input is null - */ - @Nullable - private List<String> getCommittingSegments(String realtimeTableName, - @Nullable List<String> committingSegmentsFromPropertyStore) { - - if (committingSegmentsFromPropertyStore == null) { - return null; - } - - List<String> committingSegments = new ArrayList<>(); - for (String segment : committingSegmentsFromPropertyStore) { - SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment); - if (segmentZKMetadata == null || Status.DONE.equals(segmentZKMetadata.getStatus())) { - continue; - } - committingSegments.add(segment); - } - return committingSegments; - } - /** * Retrieves and filters the list of committing segments for a realtime table from the property store. * This method: @@ -2620,20 +2591,33 @@ public class PinotLLCRealtimeSegmentManager { * 3. Filters out segments that are either deleted or already committed * * @param realtimeTableName The name of the realtime table to fetch committing segments for - * @return Filtered list of committing segments, or null if no committing segments record exists - * or if the COMMITTING_SEGMENTS field is not present in the ZNRecord + * @return Filtered list of committing segments */ public List<String> getCommittingSegments(String realtimeTableName) { - String committingSegmentsListPath = + String pauselessDebugMetadataPath = ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); - - // Fetch the committing segments record from the property store. - Stat stat = new Stat(); - ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); - if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null) { - return null; + ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, null, AccessOption.PERSISTENT); + if (znRecord == null) { + return List.of(); } - return getCommittingSegments(realtimeTableName, znRecord.getListField(COMMITTING_SEGMENTS)); } + + /** + * Returns the list of segments that are in COMMITTING state. Filters out segments that are either deleted or no + * longer in COMMITTING state. + */ + private List<String> getCommittingSegments(String realtimeTableName, @Nullable Collection<String> segmentsToCheck) { + if (CollectionUtils.isEmpty(segmentsToCheck)) { + return List.of(); + } + List<String> committingSegments = new ArrayList<>(segmentsToCheck.size()); + for (String segment : segmentsToCheck) { + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment); + if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == Status.COMMITTING) { + committingSegments.add(segment); + } + } + return committingSegments; + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 9d7ed7dd75..a86cf62e2e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1516,8 +1516,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Test - public void testGetCommittingSegments() - throws HttpErrorStatusException, IOException, URISyntaxException { + public void testGetCommittingSegments() { // mock the behavior for PinotHelixResourceManager PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); HelixManager helixManager = mock(HelixManager.class); @@ -1582,25 +1581,25 @@ public class PinotLLCRealtimeSegmentManagerTest { List<String> result = segmentManager.getCommittingSegments(realtimeTableName); // Verify results - assertNotNull(result); - assertEquals(2, result.size()); - assertTrue(result.contains(testSegments.get(0))); // Should include COMMITTING segment - assertFalse(result.contains(testSegments.get(1))); // Should exclude null metadata segment - assertFalse(result.contains(testSegments.get(2))); // Should exclude DONE segment - assertTrue(result.contains(testSegments.get(3))); // Should include COMMITTING segment + assertEquals(result, List.of(testSegments.get(0), testSegments.get(3))); + + // Test UPLOADED case + when(segmentZKMetadata0.getStatus()).thenReturn(Status.UPLOADED); + result = segmentManager.getCommittingSegments(realtimeTableName); + assertEquals(result, List.of(testSegments.get(3))); // Test null case when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) .thenReturn(null); result = segmentManager.getCommittingSegments(realtimeTableName); - assertNull(result); + assertTrue(result.isEmpty()); // Test empty COMMITTING_SEGMENTS field ZNRecord emptyRecord = new ZNRecord("CommittingSegments"); when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), eq(AccessOption.PERSISTENT))) .thenReturn(emptyRecord); result = segmentManager.getCommittingSegments(realtimeTableName); - assertNull(result); + assertTrue(result.isEmpty()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org