This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a209f9b0b1 Adding a new list in the propertystore to capture the 
committing segments. (#15016)
a209f9b0b1 is described below

commit a209f9b0b16c38e2c1559944b86af6a56ba3a70a
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Wed Feb 12 15:21:32 2025 +0530

    Adding a new list in the propertystore to capture the committing segments. 
(#15016)
    
    * Adding a new list in the propertystore to capture the committing segments.
    This will make it quicker to fetch the committing segments without going 
through each segment ZK metadata
    
    * Minor improvements and fixing checkstyle violations
    
    * Adding debufInfo API to fetch committing segments
    
    * Add support for adding missing committing segments due to failure in 
adding them during commit start protocol
    
    * Running addition and deletion of segments from the list in the same ZK 
call. This aims to simplify the code
    
    * Improving logs for debuggability
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 .../api/resources/PinotRealtimeTableResource.java  |  59 ++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 209 ++++++++++++++++++++-
 .../RealtimeSegmentValidationManager.java          |  23 +++
 .../PinotLLCRealtimeSegmentManagerTest.java        | 171 +++++++++++++++++
 5 files changed, 460 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0729533871..37ba1499e3 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -66,6 +66,7 @@ public class ZKMetadataProvider {
   private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
   private static final String PROPERTYSTORE_CONTROLLER_JOBS_PREFIX = 
"/CONTROLLER_JOBS";
   private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
+  private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX = 
"/PAUSELESS_DEBUG_METADATA";
   private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
   private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = 
"/INSTANCE_PARTITIONS";
   private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX = 
"/CONFIGS/DATABASE";
@@ -246,6 +247,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName, 
segmentName);
   }
 
+  public static String 
constructPropertyStorePathForPauselessDebugMetadata(String resourceName) {
+    return StringUtil.join("/", PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX, 
resourceName);
+  }
+
   public static String constructPropertyStorePathForSchema(String schemaName) {
     return StringUtil.join("/", PROPERTYSTORE_SCHEMAS_PREFIX, schemaName);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index e69de66acb..ab9cfe3035 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
@@ -29,6 +30,7 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -46,6 +48,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.utils.DatabaseUtils;
@@ -318,6 +321,62 @@ public class PinotRealtimeTableResource {
     }
   }
 
+  @GET
+  @Path("/tables/{tableName}/pauselessDebugInfo")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_DEBUG_INFO)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of pauseless table", notes =
+      "Gets the segments that are in error state and segments with COMMITTING 
status in ZK metadata")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public String getPauslessTableDebugInfo(
+      @ApiParam(value = "Realtime table name with or without type", required = 
true, example = "myTable | "
+          + "myTable_REALTIME") @PathParam("tableName") String 
realtimeTableName,
+      @Context HttpHeaders headers) {
+    realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, 
headers);
+    try {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      if (TableType.OFFLINE == tableType) {
+        throw new IllegalStateException("Cannot get consuming segments info 
for OFFLINE table: " + realtimeTableName);
+      }
+
+      String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+
+      Map<String, Object> result = new HashMap<>();
+
+      result.put("instanceToErrorSegmentsMap", 
getInstanceToErrorSegmentsMap(tableNameWithType));
+
+      result.put("committingSegments", 
_pinotLLCRealtimeSegmentManager.getCommittingSegments(tableNameWithType));
+
+      return JsonUtils.objectToPrettyString(result);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get pauseless debug info for table %s. %s", 
realtimeTableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  private Map<String, Set<String>> getInstanceToErrorSegmentsMap(String 
tableNameWithType) {
+    ExternalView externalView = 
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "External view does not 
exist for table: " + tableNameWithType);
+
+    Map<String, Set<String>> instanceToErrorSegmentsMap = new HashMap<>();
+
+    for (String segmentName : externalView.getPartitionSet()) {
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+      for (String instance : externalViewStateMap.keySet()) {
+        if (CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(
+            externalViewStateMap.get(instance))) {
+          instanceToErrorSegmentsMap.computeIfAbsent(instance, unused -> new 
HashSet<>()).add(segmentName);
+        }
+      }
+    }
+    return instanceToErrorSegmentsMap;
+  }
+
   private void validateTable(String tableNameWithType) {
     IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
     if (idealState == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e296136975..322401e649 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -36,10 +36,12 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -57,6 +59,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.messages.ForceCommitMessage;
@@ -124,6 +127,7 @@ import 
org.apache.pinot.spi.utils.retry.AttemptFailureException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.apache.zookeeper.data.Stat;
+import org.codehaus.commons.nullanalysis.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,6 +156,8 @@ public class PinotLLCRealtimeSegmentManager {
   public static final String PAUSE_STATE = "pauseState";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
 
+  private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f);
+  public static final String COMMITTING_SEGMENTS = "committingSegments";
   private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence 
number for new table segments
   private static final String METADATA_EVENT_NOTIFIER_PREFIX = 
"metadata.event.notifier";
 
@@ -437,6 +443,66 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  private boolean addSegmentToCommittingSegmentsList(String realtimeTableName, 
String segmentName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+    int expectedVersion = stat.getVersion();
+    LOGGER.info("Committing segments list size: {} before adding the segment: 
{}", Optional.ofNullable(znRecord)
+        .map(record -> record.getListField(COMMITTING_SEGMENTS))
+        .map(List::size)
+        .orElse(0), segmentName);
+
+    // empty ZN record for the table
+    if (znRecord == null) {
+      znRecord = new ZNRecord(realtimeTableName);
+      znRecord.setListField(COMMITTING_SEGMENTS, List.of(segmentName));
+      return _propertyStore.create(committingSegmentsListPath, znRecord, 
AccessOption.PERSISTENT);
+    }
+
+    // segment already present in the list
+    List<String> committingSegmentList = 
znRecord.getListField(COMMITTING_SEGMENTS);
+    if (committingSegmentList != null && 
committingSegmentList.contains(segmentName)) {
+      return true;
+    }
+
+    if (committingSegmentList == null) {
+      committingSegmentList = List.of(segmentName);
+    } else {
+      committingSegmentList.add(segmentName);
+    }
+    znRecord.setListField(COMMITTING_SEGMENTS, committingSegmentList);
+    try {
+      return _propertyStore.set(committingSegmentsListPath, znRecord, 
expectedVersion, AccessOption.PERSISTENT);
+    } catch (ZkBadVersionException e) {
+      return false;
+    }
+  }
+
+  private boolean removeSegmentFromCommittingSegmentsList(String 
realtimeTableName, String segmentName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+
+    LOGGER.info("Committing segments list size: {} before removing the 
segment: {}", Optional.ofNullable(znRecord)
+        .map(record -> record.getListField(COMMITTING_SEGMENTS))
+        .map(List::size)
+        .orElse(0), segmentName);
+
+    if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null 
|| !znRecord.getListField(
+        COMMITTING_SEGMENTS).contains(segmentName)) {
+      return true;
+    }
+    znRecord.getListField(COMMITTING_SEGMENTS).remove(segmentName);
+    try {
+      return _propertyStore.set(committingSegmentsListPath, znRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+    } catch (ZkBadVersionException e) {
+      return false;
+    }
+  }
+
   public IdealState getIdealState(String realtimeTableName) {
     try {
       IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
realtimeTableName);
@@ -611,6 +677,18 @@ public class PinotLLCRealtimeSegmentManager {
     // No-op
   }
 
+  private boolean updateCommittingSegmentsList(String realtimeTableName,
+      Callable<Boolean> operation) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(operation);
+    } catch (Exception e) {
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
+      LOGGER.error("Failed to update committing segments list for table: {}", 
realtimeTableName, e);
+      return false;
+    }
+    return true;
+  }
+
   // Step 1: Update committing segment metadata
   private SegmentZKMetadata updateCommittingSegmentMetadata(String 
realtimeTableName,
       CommittingSegmentDescriptor committingSegmentDescriptor, boolean 
isStartMetadata) {
@@ -681,9 +759,10 @@ public class PinotLLCRealtimeSegmentManager {
    * the response is sent to the server to build the segment.
    * <p>
    * This method performs the following actions:
-   * 1. Updates the property store segment metadata status from IN_PROGRESS to 
COMMITTING.
-   * 2. Creates a new property store record for the next consuming segment.
-   * 3. Updates the ideal state to mark the new segment as CONSUMING.
+   * 1. Adds the segment to the committing segment list
+   * 2. Updates the property store segment metadata status from IN_PROGRESS to 
COMMITTING.
+   * 3. Creates a new property store record for the next consuming segment.
+   * 4. Updates the ideal state to mark the new segment as CONSUMING.
    */
   public void commitSegmentStartMetadata(String realtimeTableName,
       CommittingSegmentDescriptor committingSegmentDescriptor) {
@@ -693,6 +772,13 @@ public class PinotLLCRealtimeSegmentManager {
 
     try {
       _numCompletingSegments.addAndGet(1);
+      LOGGER.info("Adding segment: {} to committing segment list",
+          committingSegmentDescriptor.getSegmentName());
+      if (!updateCommittingSegmentsList(realtimeTableName,
+          () -> addSegmentToCommittingSegmentsList(realtimeTableName, 
committingSegmentDescriptor.getSegmentName()))) {
+        LOGGER.error("Failed to update committing segments list for table: {}, 
segment: {}", realtimeTableName,
+            committingSegmentDescriptor.getSegmentName());
+      }
       commitSegmentMetadataInternal(realtimeTableName, 
committingSegmentDescriptor, true);
     } finally {
       _numCompletingSegments.addAndGet(-1);
@@ -701,7 +787,8 @@ public class PinotLLCRealtimeSegmentManager {
 
   /**
    * Invoked after the realtime segment has been built and uploaded during 
pauseless ingestion.
-   * Updates the metadata like CRC, download URL, etc. in the Zookeeper 
metadata for the committing segment.
+   * Updates the metadata like CRC, download URL, etc. in the Zookeeper 
metadata for the committing segment
+   * and removes the segment from the committing segment list.
    */
   public void commitSegmentEndMetadata(String realtimeTableName,
       CommittingSegmentDescriptor committingSegmentDescriptor) {
@@ -739,6 +826,14 @@ public class PinotLLCRealtimeSegmentManager {
       LOGGER.info("Updating segment ZK metadata for segment: {}", 
committingSegmentName);
       updateCommittingSegmentMetadata(realtimeTableName, 
committingSegmentDescriptor, false);
       LOGGER.info("Successfully updated segment metadata for segment: {}", 
committingSegmentName);
+      // remove the segment from the committing segment list
+      LOGGER.info("Removing segment: {} from committing segment list",
+          committingSegmentDescriptor.getSegmentName());
+      if (!updateCommittingSegmentsList(realtimeTableName,
+          () -> removeSegmentFromCommittingSegmentsList(realtimeTableName, 
committingSegmentName))) {
+        LOGGER.error("Failed to update committing segments list for table: {}, 
segment: {}", realtimeTableName,
+            committingSegmentDescriptor.getSegmentName());
+      }
     } finally {
       _numCompletingSegments.addAndGet(-1);
     }
@@ -968,7 +1063,7 @@ public class PinotLLCRealtimeSegmentManager {
               instanceName);
         }
         return idealState;
-      }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
+      }, DEFAULT_RETRY_POLICY, true);
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
       throw e;
@@ -1086,7 +1181,7 @@ public class PinotLLCRealtimeSegmentManager {
             realtimeTableName, isTableEnabled, isTablePaused);
         return idealState;
       }
-    }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
+    }, DEFAULT_RETRY_POLICY, true);
   }
 
   /**
@@ -1116,7 +1211,7 @@ public class PinotLLCRealtimeSegmentManager {
       
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
           isTablePaused(idealState) ? null : newSegmentName, 
segmentAssignment, instancePartitionsMap);
       return idealState;
-    }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
+    }, DEFAULT_RETRY_POLICY);
   }
 
   public static boolean isTablePaused(IdealState idealState) {
@@ -2228,4 +2323,104 @@ public class PinotLLCRealtimeSegmentManager {
     }
     return segmentsYetToBeCommitted;
   }
+
+  /**
+   * Synchronizes the list of committing segments for a realtime table in 
ZooKeeper by both adding new segments
+   * and removing segments that are no longer in COMMITTING state. This 
function is designed to be called periodically
+   * to maintain an up-to-date list of actively committing segments.
+   *
+   * The synchronization process works as follows:
+   * 1. For a new table (no existing ZooKeeper record), creates a fresh list 
with the provided segments
+   * 2. For an existing table, merges the new segments with currently 
committing segments while removing any
+   *    segments that are no longer in COMMITTING state
+   * 3. Maintains uniqueness of segments using a Set-based deduplication
+   *
+   * @param realtimeTableName Name of the realtime table whose committing 
segments list needs to be synchronized
+   * @param committingSegments List of new segment names that are currently in 
COMMITTING state.
+   *                          If null, returns true without making any changes 
to the existing list
+   * @return true if the synchronization succeeds, false if there's a failure 
in updating ZooKeeper
+   * @see #getCommittingSegments for the logic that filters out segments no 
longer in COMMITTING state
+   */
+  public boolean syncCommittingSegments(String realtimeTableName, @NotNull 
List<String> committingSegments) {
+    return updateCommittingSegmentsList(realtimeTableName, () -> {
+      String committingSegmentsListPath =
+          
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+      // Fetch the committing segments record from the property store.
+      Stat stat = new Stat();
+      ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+
+      // empty ZN record for the table
+      if (znRecord == null) {
+        znRecord = new ZNRecord(realtimeTableName);
+        znRecord.setListField(COMMITTING_SEGMENTS, committingSegments);
+        return _propertyStore.create(committingSegmentsListPath, znRecord, 
AccessOption.PERSISTENT);
+      }
+
+      Set<String> mergedSegments = new HashSet<>(committingSegments);
+      // Get segments that are present in the list and are still in COMMITTING 
status
+      List<String> existingSegments =
+          getCommittingSegments(realtimeTableName, 
znRecord.getListField(COMMITTING_SEGMENTS));
+      if (existingSegments != null) {
+        mergedSegments.addAll(existingSegments);
+      }
+
+      znRecord.setListField(COMMITTING_SEGMENTS, new 
ArrayList<>(mergedSegments));
+      return _propertyStore.set(committingSegmentsListPath, znRecord, 
stat.getVersion(), AccessOption.PERSISTENT);
+    });
+  }
+
+  /**
+   * Filters and returns a list of committing segments for a realtime table.
+   * This method excludes segments that are either:
+   * 1. Missing from ZK metadata (likely deleted)
+   * 2. Already committed (status: DONE)
+   *
+   * @param realtimeTableName The name of the realtime table
+   * @param committingSegmentsFromPropertyStore List of segments from property 
store, can be null
+   * @return Filtered list of committing segments, or null if input is null
+   */
+  @Nullable
+  private List<String> getCommittingSegments(String realtimeTableName,
+      @Nullable List<String> committingSegmentsFromPropertyStore) {
+
+    if (committingSegmentsFromPropertyStore == null) {
+      return null;
+    }
+
+    List<String> committingSegments = new ArrayList<>();
+    for (String segment : committingSegmentsFromPropertyStore) {
+      SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
+      if (segmentZKMetadata == null || 
Status.DONE.equals(segmentZKMetadata.getStatus())) {
+        continue;
+      }
+      committingSegments.add(segment);
+    }
+    return committingSegments;
+  }
+
+  /**
+   * Retrieves and filters the list of committing segments for a realtime 
table from the property store.
+   * This method:
+   * 1. Constructs the ZK path for pauseless debug metadata
+   * 2. Fetches the committing segments record from the property store
+   * 3. Filters out segments that are either deleted or already committed
+   *
+   * @param realtimeTableName The name of the realtime table to fetch 
committing segments for
+   * @return Filtered list of committing segments, or null if no committing 
segments record exists
+   *         or if the COMMITTING_SEGMENTS field is not present in the ZNRecord
+   */
+  public List<String> getCommittingSegments(String realtimeTableName) {
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+    // Fetch the committing segments record from the property store.
+    Stat stat = new Stat();
+    ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
+    if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == 
null) {
+      return null;
+    }
+
+    return getCommittingSegments(realtimeTableName, 
znRecord.getListField(COMMITTING_SEGMENTS));
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5ea86bd9fb..db832aede6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
@@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -176,6 +179,12 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     // Update the total document count gauge
     _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName, 
computeTotalDocumentCount(segmentsZKMetadata));
 
+    // Ensures all segments in COMMITTING state are properly tracked in 
ZooKeeper.
+    // Acts as a recovery mechanism for segments that may have failed to 
register during start of commit protocol.
+    if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+      syncCommittingSegmentsFromMetadata(realtimeTableName, 
segmentsZKMetadata);
+    }
+
     // Check missing segments and upload them to the deep store
     if (_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
       _llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, 
segmentsZKMetadata);
@@ -186,6 +195,20 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     }
   }
 
+  private void syncCommittingSegmentsFromMetadata(String realtimeTableName,
+      List<SegmentZKMetadata> segmentsZKMetadata) {
+    List<String> committingSegments = new ArrayList<>();
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      if 
(CommonConstants.Segment.Realtime.Status.COMMITTING.equals(segmentZKMetadata.getStatus()))
 {
+        committingSegments.add(segmentZKMetadata.getSegmentName());
+      }
+    }
+    LOGGER.info("Adding committing segments to ZK: {}", committingSegments);
+    if (!_llcRealtimeSegmentManager.syncCommittingSegments(realtimeTableName, 
committingSegments)) {
+      LOGGER.error("Failed to add committing segments for table: {}", 
realtimeTableName);
+    }
+  }
+
   @Override
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
     for (String tableNameWithType : tableNamesWithType) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index fcbb5c4a68..fd1f9d67c6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -44,6 +45,7 @@ import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
@@ -53,6 +55,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
@@ -94,6 +97,7 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
+import static 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.COMMITTING_SEGMENTS;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
@@ -1482,6 +1486,173 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assert ImmutableSet.of("s2", "s4", "s5").equals(segmentsYetToBeCommitted);
   }
 
+  @Test
+  public void testGetCommittingSegments()
+      throws HttpErrorStatusException, IOException, URISyntaxException {
+    // mock the behavior for PinotHelixResourceManager
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    HelixManager helixManager = mock(HelixManager.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+    
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+    
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+    // init fake PinotLLCRealtimeSegmentManager
+    ControllerConf controllerConfig = new ControllerConf();
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, 
controllerConfig);
+
+    // Test table name
+    String realtimeTableName = "githubEvents_2_REALTIME";
+
+    // Create test segments
+    List<String> testSegments = List.of(
+        "githubEvents_2__0__0__20250210T1142Z",
+        "githubEvents_2__0__1__20250210T1142Z",
+        "githubEvents_2__0__2__20250210T1142Z",
+        "githubEvents_2__0__3__20250210T1142Z"
+    );
+
+    // mock response of propertyStore
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+    ZNRecord znRecord = new ZNRecord(realtimeTableName);
+    znRecord.setListField(COMMITTING_SEGMENTS, testSegments);
+
+    when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(znRecord);
+
+    // mock response for fetching segmentZKMetadata with different scenarios
+    // Segment 0: COMMITTING status
+    SegmentZKMetadata segmentZKMetadata0 = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata0.getStatus()).thenReturn(Status.COMMITTING);
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
testSegments.get(0)))
+        .thenReturn(segmentZKMetadata0);
+
+    // Segment 1: null metadata (deleted)
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
testSegments.get(1)))
+        .thenReturn(null);
+
+    // Segment 2: DONE status
+    SegmentZKMetadata segmentZKMetadata2 = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata2.getStatus()).thenReturn(Status.DONE);
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
testSegments.get(2)))
+        .thenReturn(segmentZKMetadata2);
+
+    // Segment 3: COMMITTING status
+    SegmentZKMetadata segmentZKMetadata3 = mock(SegmentZKMetadata.class);
+    when(segmentZKMetadata3.getStatus()).thenReturn(Status.COMMITTING);
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
testSegments.get(3)))
+        .thenReturn(segmentZKMetadata3);
+
+    // Execute test
+    List<String> result = 
segmentManager.getCommittingSegments(realtimeTableName);
+
+    // Verify results
+    assertNotNull(result);
+    assertEquals(2, result.size());
+    assertTrue(result.contains(testSegments.get(0))); // Should include 
COMMITTING segment
+    assertFalse(result.contains(testSegments.get(1))); // Should exclude null 
metadata segment
+    assertFalse(result.contains(testSegments.get(2))); // Should exclude DONE 
segment
+    assertTrue(result.contains(testSegments.get(3))); // Should include 
COMMITTING segment
+
+    // Test null case
+    when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(null);
+    result = segmentManager.getCommittingSegments(realtimeTableName);
+    assertNull(result);
+
+    // Test empty COMMITTING_SEGMENTS field
+    ZNRecord emptyRecord = new ZNRecord("CommittingSegments");
+    when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(emptyRecord);
+    result = segmentManager.getCommittingSegments(realtimeTableName);
+    assertNull(result);
+  }
+
+  @Test
+  public void testSyncCommittingSegments() throws Exception {
+    // Set up mocks for the resource management infrastructure
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    HelixManager helixManager = mock(HelixManager.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
+        (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
+
+    // Configure basic mock behaviors
+    
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+    
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+    // Initialize the segment manager
+    ControllerConf controllerConfig = new ControllerConf();
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, 
controllerConfig);
+
+    String realtimeTableName = "testTable_REALTIME";
+    String committingSegmentsListPath =
+        
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
+
+
+    // Create test segments with different states
+    String committingSegment1 = "testTable__0__0__20250210T1142Z";
+    String committingSegment2 = "testTable__0__1__20250210T1142Z";
+    String doneSegment = "testTable__0__2__20250210T1142Z";
+
+    // Set up segment metadata mocks
+    SegmentZKMetadata committingMetadata1 = mock(SegmentZKMetadata.class);
+    when(committingMetadata1.getStatus()).thenReturn(Status.COMMITTING);
+
+    SegmentZKMetadata committingMetadata2 = mock(SegmentZKMetadata.class);
+    when(committingMetadata2.getStatus()).thenReturn(Status.COMMITTING);
+
+    SegmentZKMetadata doneMetadata = mock(SegmentZKMetadata.class);
+    when(doneMetadata.getStatus()).thenReturn(Status.DONE);
+
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
committingSegment1)).thenReturn(
+        committingMetadata1);
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
committingSegment2)).thenReturn(
+        committingMetadata2);
+    when(pinotHelixResourceManager.getSegmentZKMetadata(realtimeTableName, 
doneSegment)).thenReturn(doneMetadata);
+
+    // Test 1: Initial creation with mixed status segments
+    List<String> newSegments = Arrays.asList(committingSegment1, 
committingSegment2);
+    when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(null);
+    when(zkHelixPropertyStore.create(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(true);
+
+    assertTrue(segmentManager.syncCommittingSegments(realtimeTableName, 
newSegments));
+
+    // Test 2: Syncing with existing segments including DONE and missing 
metadata
+    ZNRecord existingRecord = new ZNRecord(realtimeTableName);
+    existingRecord.setListField(COMMITTING_SEGMENTS,
+        Arrays.asList(committingSegment2, doneSegment));
+
+    when(zkHelixPropertyStore.get(eq(committingSegmentsListPath), any(), 
eq(AccessOption.PERSISTENT)))
+        .thenReturn(existingRecord);
+    when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(), 
anyInt(), eq(AccessOption.PERSISTENT)))
+        .thenReturn(true);
+
+    // There should not be any duplicates and the doneSegment should be 
removed from the list
+    assertTrue(segmentManager.syncCommittingSegments(realtimeTableName,
+        Arrays.asList(committingSegment1, committingSegment2)));
+    assertEquals(new 
HashSet<>(existingRecord.getListField(COMMITTING_SEGMENTS)),
+        new HashSet<>(List.of(committingSegment1, committingSegment2)));
+
+
+    // Test 3: Error handling during ZooKeeper operations
+    when(zkHelixPropertyStore.set(eq(committingSegmentsListPath), any(), 
anyInt(), eq(AccessOption.PERSISTENT)))
+        .thenThrow(new RuntimeException("ZooKeeper operation failed"));
+    assertFalse(segmentManager.syncCommittingSegments(realtimeTableName, 
newSegments));
+  }
+
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   
/////////////////////////////////////////////////////////////////////////////////


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to