This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9145e92f65 Fix committing segments list (#15495)
9145e92f65 is described below

commit 9145e92f655ed481d9ceac6e92ba1d0671b1d1c4
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Apr 9 21:45:42 2025 -0600

    Fix committing segments list (#15495)
---
 .../api/resources/PinotRealtimeTableResource.java  |  2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 98 +++++++++-------------
 .../PinotLLCRealtimeSegmentManagerTest.java        | 19 ++---
 3 files changed, 51 insertions(+), 68 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index b2d2292021..595169ce44 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -333,7 +333,7 @@ public class PinotRealtimeTableResource {
       @ApiResponse(code = 404, message = "Table not found"),
       @ApiResponse(code = 500, message = "Internal server error")
   })
-  public String getPauslessTableDebugInfo(
+  public String getPauselessTableDebugInfo(
       @ApiParam(value = "Realtime table name with or without type", required = 
true, example = "myTable | "
           + "myTable_REALTIME") @PathParam("tableName") String 
realtimeTableName,
       @Context HttpHeaders headers) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 52bf05091f..7e93f70e67 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
@@ -133,7 +134,6 @@ import 
org.apache.pinot.spi.utils.retry.AttemptFailureException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.apache.zookeeper.data.Stat;
-import org.codehaus.commons.nullanalysis.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2552,66 +2552,37 @@ public class PinotLLCRealtimeSegmentManager {
    * @param committingSegments List of new segment names that are currently in 
COMMITTING state.
    *                          If null, returns true without making any changes 
to the existing list
    * @return true if the synchronization succeeds, false if there's a failure 
in updating ZooKeeper
-   * @see #getCommittingSegments for the logic that filters out segments no 
longer in COMMITTING state
    */
-  public boolean syncCommittingSegments(String realtimeTableName, @NotNull 
List<String> committingSegments) {
+  public boolean syncCommittingSegments(String realtimeTableName, List<String> 
committingSegments) {
+    String pauselessDebugMetadataPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
     return updateCommittingSegmentsList(realtimeTableName, () -> {
-      String committingSegmentsListPath =
-          
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
-
       // Fetch the committing segments record from the property store.
       Stat stat = new Stat();
-      ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+      ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, stat, 
AccessOption.PERSISTENT);
 
-      // empty ZN record for the table
+      // Create ZN record if it doesn't exist
       if (znRecord == null) {
         znRecord = new ZNRecord(realtimeTableName);
         znRecord.setListField(COMMITTING_SEGMENTS, committingSegments);
-        return _propertyStore.create(committingSegmentsListPath, znRecord, 
AccessOption.PERSISTENT);
+        return _propertyStore.create(pauselessDebugMetadataPath, znRecord, 
AccessOption.PERSISTENT);
       }
 
-      Set<String> mergedSegments = new HashSet<>(committingSegments);
-      // Get segments that are present in the list and are still in COMMITTING 
status
-      List<String> existingSegments =
-          getCommittingSegments(realtimeTableName, 
znRecord.getListField(COMMITTING_SEGMENTS));
-      if (existingSegments != null) {
-        mergedSegments.addAll(existingSegments);
+      // Check ZK metadata again to get the latest list of committing segments
+      List<String> committingSegmentsFromPropertyStore = 
znRecord.getListField(COMMITTING_SEGMENTS);
+      List<String> latestCommittingSegments;
+      if (CollectionUtils.isEmpty(committingSegmentsFromPropertyStore)) {
+        latestCommittingSegments = getCommittingSegments(realtimeTableName, 
committingSegments);
+      } else {
+        Set<String> segmentsToCheck = new HashSet<>(committingSegments);
+        segmentsToCheck.addAll(committingSegmentsFromPropertyStore);
+        latestCommittingSegments = getCommittingSegments(realtimeTableName, 
segmentsToCheck);
       }
-
-      znRecord.setListField(COMMITTING_SEGMENTS, new 
ArrayList<>(mergedSegments));
-      return _propertyStore.set(committingSegmentsListPath, znRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+      znRecord.setListField(COMMITTING_SEGMENTS, latestCommittingSegments);
+      return _propertyStore.set(pauselessDebugMetadataPath, znRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
     });
   }
 
-  /**
-   * Filters and returns a list of committing segments for a realtime table.
-   * This method excludes segments that are either:
-   * 1. Missing from ZK metadata (likely deleted)
-   * 2. Already committed (status: DONE)
-   *
-   * @param realtimeTableName The name of the realtime table
-   * @param committingSegmentsFromPropertyStore List of segments from property 
store, can be null
-   * @return Filtered list of committing segments, or null if input is null
-   */
-  @Nullable
-  private List<String> getCommittingSegments(String realtimeTableName,
-      @Nullable List<String> committingSegmentsFromPropertyStore) {
-
-    if (committingSegmentsFromPropertyStore == null) {
-      return null;
-    }
-
-    List<String> committingSegments = new ArrayList<>();
-    for (String segment : committingSegmentsFromPropertyStore) {
-      SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
-      if (segmentZKMetadata == null || 
Status.DONE.equals(segmentZKMetadata.getStatus())) {
-        continue;
-      }
-      committingSegments.add(segment);
-    }
-    return committingSegments;
-  }
-
   /**
    * Retrieves and filters the list of committing segments for a realtime 
table from the property store.
    * This method:
@@ -2620,20 +2591,33 @@ public class PinotLLCRealtimeSegmentManager {
    * 3. Filters out segments that are either deleted or already committed
    *
    * @param realtimeTableName The name of the realtime table to fetch 
committing segments for
-   * @return Filtered list of committing segments, or null if no committing 
segments record exists
-   *         or if the COMMITTING_SEGMENTS field is not present in the ZNRecord
+   * @return Filtered list of committing segments
    */
   public List<String> getCommittingSegments(String realtimeTableName) {
-    String committingSegmentsListPath =
+    String pauselessDebugMetadataPath =
         
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
-
-    // Fetch the committing segments record from the property store.
-    Stat stat = new Stat();
-    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
-    if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == 
null) {
-      return null;
+    ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, null, 
AccessOption.PERSISTENT);
+    if (znRecord == null) {
+      return List.of();
     }
-
     return getCommittingSegments(realtimeTableName, 
znRecord.getListField(COMMITTING_SEGMENTS));
   }
+
+  /**
+   * Returns the list of segments that are in COMMITTING state. Filters out 
segments that are either deleted or no
+   * longer in COMMITTING state.
+   */
+  private List<String> getCommittingSegments(String realtimeTableName, 
@Nullable Collection<String> segmentsToCheck) {
+    if (CollectionUtils.isEmpty(segmentsToCheck)) {
+      return List.of();
+    }
+    List<String> committingSegments = new ArrayList<>(segmentsToCheck.size());
+    for (String segment : segmentsToCheck) {
+      SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
+      if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == 
Status.COMMITTING) {
+        committingSegments.add(segment);
+      }
+    }
+    return committingSegments;
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 9d7ed7dd75..a86cf62e2e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1516,8 +1516,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
   }
 
   @Test
-  public void testGetCommittingSegments()
-      throws HttpErrorStatusException, IOException, URISyntaxException {
+  public void testGetCommittingSegments() {
     // mock the behavior for PinotHelixResourceManager
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     HelixManager helixManager = mock(HelixManager.class);
@@ -1582,25 +1581,25 @@ public class PinotLLCRealtimeSegmentManagerTest {
     List<String> result = 
segmentManager.getCommittingSegments(realtimeTableName);
 
     // Verify results
-    assertNotNull(result);
-    assertEquals(2, result.size());
-    assertTrue(result.contains(testSegments.get(0))); // Should include 
COMMITTING segment
-    assertFalse(result.contains(testSegments.get(1))); // Should exclude null 
metadata segment
-    assertFalse(result.contains(testSegments.get(2))); // Should exclude DONE 
segment
-    assertTrue(result.contains(testSegments.get(3))); // Should include 
COMMITTING segment
+    assertEquals(result, List.of(testSegments.get(0), testSegments.get(3)));
+
+    // Test UPLOADED case
+    when(segmentZKMetadata0.getStatus()).thenReturn(Status.UPLOADED);
+    result = segmentManager.getCommittingSegments(realtimeTableName);
+    assertEquals(result, List.of(testSegments.get(3)));
 
     // Test null case
     when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
         .thenReturn(null);
     result = segmentManager.getCommittingSegments(realtimeTableName);
-    assertNull(result);
+    assertTrue(result.isEmpty());
 
     // Test empty COMMITTING_SEGMENTS field
     ZNRecord emptyRecord = new ZNRecord("CommittingSegments");
     when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
         .thenReturn(emptyRecord);
     result = segmentManager.getCommittingSegments(realtimeTableName);
-    assertNull(result);
+    assertTrue(result.isEmpty());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to