This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4154559944a Fix rebalance logic to treat COMMITTING segments as 
CONSUMING (#16348)
4154559944a is described below

commit 4154559944ac29e042900a6a622ea0f04b574d8f
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jul 16 15:08:43 2025 -0600

    Fix rebalance logic to treat COMMITTING segments as CONSUMING (#16348)
---
 .../segment/RealtimeSegmentAssignment.java         | 14 +++-
 .../assignment/segment/SegmentAssignmentUtils.java | 49 +++++++++----
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 82 +++++++++++++---------
 .../core/realtime/SegmentCompletionManager.java    |  1 -
 4 files changed, 97 insertions(+), 49 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index 4909c4b1169..80ed9cdc292 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -20,17 +20,21 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.common.utils.SegmentUtils;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
@@ -192,8 +196,16 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
             + "includeConsuming: {}, bootstrap: {}", _tableNameWithType, 
completedInstancePartitions,
         consumingInstancePartitions, includeConsuming, bootstrap);
 
+    Set<String> committingSegments = null;
+    if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) {
+      List<String> committingSegmentList = 
PinotLLCRealtimeSegmentManager.getCommittingSegments(_tableNameWithType,
+          _helixManager.getHelixPropertyStore());
+      if (!committingSegmentList.isEmpty()) {
+        committingSegments = new HashSet<>(committingSegmentList);
+      }
+    }
     SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
-        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment);
+        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment,
 committingSegments);
     Map<String, Map<String, String>> newAssignment;
 
     // Reassign COMPLETED segments first
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 027f8defba9..68ca22fbda4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -37,6 +39,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.tier.Tier;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.Pairs;
 
 
@@ -387,19 +390,38 @@ public class SegmentAssignmentUtils {
     private final Map<String, Map<String, String>> _offlineSegmentAssignment = 
new TreeMap<>();
 
     // NOTE: split the segments based on the following criteria:
-    //       1. At least one instance ONLINE -> COMPLETED segment
-    //       2. At least one instance CONSUMING -> CONSUMING segment
+    //       1. At least one instance ONLINE && segment is not COMMITTING -> 
COMPLETED segment
+    //       2. At least one instance CONSUMING || segment is COMMITTING -> 
CONSUMING segment
     //       3. All instances OFFLINE (all instances encountered error while 
consuming) -> OFFLINE segment
-    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, 
String>> segmentAssignment) {
-      for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
-        String segmentName = entry.getKey();
-        Map<String, String> instanceStateMap = entry.getValue();
-        if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
-          _completedSegmentAssignment.put(segmentName, instanceStateMap);
-        } else if 
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
-          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
-        } else {
-          _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, 
String>> segmentAssignment,
+        @Nullable Set<String> committingSegments) {
+      if (CollectionUtils.isEmpty(committingSegments)) {
+        for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+          String segmentName = entry.getKey();
+          Map<String, String> instanceStateMap = entry.getValue();
+          if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+            _completedSegmentAssignment.put(segmentName, instanceStateMap);
+          } else if 
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+            _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+          } else {
+            _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+          }
+        }
+      } else {
+        for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+          String segmentName = entry.getKey();
+          Map<String, String> instanceStateMap = entry.getValue();
+          if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+            if (committingSegments.contains(segmentName)) {
+              _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+            } else {
+              _completedSegmentAssignment.put(segmentName, instanceStateMap);
+            }
+          } else if 
(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
+            _consumingSegmentAssignment.put(segmentName, instanceStateMap);
+          } else {
+            _offlineSegmentAssignment.put(segmentName, instanceStateMap);
+          }
         }
       }
     }
@@ -452,7 +474,8 @@ public class SegmentAssignmentUtils {
           // find an eligible tier for the segment, from the ordered list of 
tiers
           SegmentZKMetadata segmentZKMetadata =
               ZKMetadataProvider.getSegmentZKMetadata(propertyStore, 
tableNameWithType, segmentName);
-          if (segmentZKMetadata != null) {
+          // Skip COMMITTING segments
+          if (segmentZKMetadata != null && segmentZKMetadata.getStatus() != 
Status.COMMITTING) {
             for (Tier tier : sortedTiers) {
               if (tier.getSegmentSelector().selectSegment(tableNameWithType, 
segmentZKMetadata)) {
                 
_tierNameToSegmentAssignmentMap.get(tier.getName()).put(segmentName, 
instanceStateMap);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5f7c9501785..57cff126e40 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
@@ -484,10 +485,9 @@ public class PinotLLCRealtimeSegmentManager {
     Stat stat = new Stat();
     ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
     int expectedVersion = stat.getVersion();
-    LOGGER.info("Committing segments list size: {} before adding the segment: 
{}", Optional.ofNullable(znRecord)
-        .map(record -> record.getListField(COMMITTING_SEGMENTS))
-        .map(List::size)
-        .orElse(0), segmentName);
+    LOGGER.info("Committing segments list size: {} before adding the segment: 
{}",
+        Optional.ofNullable(znRecord).map(record -> 
record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0),
+        segmentName);
 
     // empty ZN record for the table
     if (znRecord == null) {
@@ -521,10 +521,9 @@ public class PinotLLCRealtimeSegmentManager {
     Stat stat = new Stat();
     ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, 
AccessOption.PERSISTENT);
 
-    LOGGER.info("Committing segments list size: {} before removing the 
segment: {}", Optional.ofNullable(znRecord)
-        .map(record -> record.getListField(COMMITTING_SEGMENTS))
-        .map(List::size)
-        .orElse(0), segmentName);
+    LOGGER.info("Committing segments list size: {} before removing the 
segment: {}",
+        Optional.ofNullable(znRecord).map(record -> 
record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0),
+        segmentName);
 
     if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null 
|| !znRecord.getListField(
         COMMITTING_SEGMENTS).contains(segmentName)) {
@@ -806,8 +805,7 @@ public class PinotLLCRealtimeSegmentManager {
       } else {
         LOGGER.info(
             "Skipping creation of new segment metadata after segment: {} 
during commit. Reason: Partition ID: {} not "
-                + "found in upstream metadata.",
-            committingSegmentName, committingSegmentPartitionGroupId);
+                + "found in upstream metadata.", committingSegmentName, 
committingSegmentPartitionGroupId);
       }
     } else {
       LOGGER.info(
@@ -1186,12 +1184,10 @@ public class PinotLLCRealtimeSegmentManager {
     prevSegmentZKMetadata.setSizeThresholdToFlushSegment(newNumRows);
 
     persistSegmentZKMetadata(realtimeTableName, prevSegmentZKMetadata, 
stat.getVersion());
-    _helixResourceManager.resetSegment(
-        realtimeTableName, segmentName, null);
-    LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {} 
to {}",
-        segmentName, prevTargetNumRows, prevNumRows, newNumRows);
-    _controllerMetrics.addMeteredTableValue(
-        realtimeTableName, ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L);
+    _helixResourceManager.resetSegment(realtimeTableName, segmentName, null);
+    LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {} 
to {}", segmentName, prevTargetNumRows,
+        prevNumRows, newNumRows);
+    _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L);
   }
 
   /**
@@ -1272,8 +1268,8 @@ public class PinotLLCRealtimeSegmentManager {
         boolean offsetsHaveToChange = offsetCriteria != null;
         if (isTableEnabled && !isTablePaused) {
           List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-              offsetsHaveToChange
-                  ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+              offsetsHaveToChange ? Collections.emptyList()
+                  // offsets from metadata are not valid anymore; fetch for 
all partitions
                   : getPartitionGroupConsumptionStatusList(idealState, 
streamConfigs);
           // FIXME: Right now, we assume topics are sharing same offset 
criteria
           OffsetCriteria originalOffsetCriteria = 
streamConfigs.get(0).getOffsetCriteria();
@@ -1628,8 +1624,8 @@ public class PinotLLCRealtimeSegmentManager {
 
           // Do not create new CONSUMING segment when the stream partition has 
reached end of life.
           if (!partitionIdToSmallestOffset.containsKey(partitionId)) {
-            LOGGER.info("PartitionGroup: {} has reached end of life. Skipping 
creation of new segment {}",
-                partitionId, latestSegmentName);
+            LOGGER.info("PartitionGroup: {} has reached end of life. Skipping 
creation of new segment {}", partitionId,
+                latestSegmentName);
             continue;
           }
 
@@ -1722,8 +1718,8 @@ public class PinotLLCRealtimeSegmentManager {
         instancePartitionsMap);
   }
 
-  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(
-      List<StreamConfig> streamConfigs, IdealState idealState) {
+  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
+      IdealState idealState) {
     Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
     for (StreamConfig streamConfig : streamConfigs) {
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
@@ -2496,10 +2492,8 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
 
-
     if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) {
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
-          ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0);
       _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
           ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0);
       return;
@@ -2518,12 +2512,12 @@ public class PinotLLCRealtimeSegmentManager {
       return;
     } else {
       LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
-      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
-          ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
+      _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, 
ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT,
+          segmentsInErrorStateInAllReplicas.size());
     }
 
     for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
-      SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName);
+      SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName);
       if (segmentZKMetadata == null) {
         LOGGER.warn("Segment metadata not found for segment: {} in table: {}, 
skipping repairing it", segmentName,
             realtimeTableName);
@@ -2704,6 +2698,20 @@ public class PinotLLCRealtimeSegmentManager {
     });
   }
 
+  public List<String> getCommittingSegments(String realtimeTableName) {
+    return getCommittingSegments(realtimeTableName, _propertyStore, 
_helixResourceManager::getSegmentZKMetadata);
+  }
+
+  private List<String> getCommittingSegments(String realtimeTableName, 
Collection<String> segmentsToCheck) {
+    return getCommittingSegments(realtimeTableName, segmentsToCheck, 
_helixResourceManager::getSegmentZKMetadata);
+  }
+
+  public static List<String> getCommittingSegments(String realtimeTableName,
+      ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    return getCommittingSegments(realtimeTableName, propertyStore,
+        (t, s) -> ZKMetadataProvider.getSegmentZKMetadata(propertyStore, t, 
s));
+  }
+
   /**
    * Retrieves and filters the list of committing segments for a realtime 
table from the property store.
    * This method:
@@ -2714,27 +2722,33 @@ public class PinotLLCRealtimeSegmentManager {
    * @param realtimeTableName The name of the realtime table to fetch 
committing segments for
    * @return Filtered list of committing segments
    */
-  public List<String> getCommittingSegments(String realtimeTableName) {
+  private static List<String> getCommittingSegments(String realtimeTableName,
+      ZkHelixPropertyStore<ZNRecord> propertyStore, BiFunction<String, String, 
SegmentZKMetadata> zkMetadataProvider) {
     String pauselessDebugMetadataPath =
         
ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName);
-    ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, null, 
AccessOption.PERSISTENT);
+    ZNRecord znRecord = propertyStore.get(pauselessDebugMetadataPath, null, 
AccessOption.PERSISTENT);
     if (znRecord == null) {
       return List.of();
     }
-    return getCommittingSegments(realtimeTableName, 
znRecord.getListField(COMMITTING_SEGMENTS));
+    List<String> committingSegments = 
znRecord.getListField(COMMITTING_SEGMENTS);
+    if (committingSegments == null) {
+      return List.of();
+    }
+    return getCommittingSegments(realtimeTableName, committingSegments, 
zkMetadataProvider);
   }
 
   /**
    * Returns the list of segments that are in COMMITTING state. Filters out 
segments that are either deleted or no
    * longer in COMMITTING state.
    */
-  private List<String> getCommittingSegments(String realtimeTableName, 
@Nullable Collection<String> segmentsToCheck) {
-    if (CollectionUtils.isEmpty(segmentsToCheck)) {
+  private static List<String> getCommittingSegments(String realtimeTableName, 
Collection<String> segmentsToCheck,
+      BiFunction<String, String, SegmentZKMetadata> zkMetadataProvider) {
+    if (segmentsToCheck.isEmpty()) {
       return List.of();
     }
     List<String> committingSegments = new ArrayList<>(segmentsToCheck.size());
     for (String segment : segmentsToCheck) {
-      SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment);
+      SegmentZKMetadata segmentZKMetadata = 
zkMetadataProvider.apply(realtimeTableName, segment);
       if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == 
Status.COMMITTING) {
         committingSegments.add(segment);
       }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7607f2114a0..520daaeea3b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -127,7 +127,6 @@ public class SegmentCompletionManager {
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
     String segmentName = llcSegmentName.getSegmentName();
     SegmentZKMetadata segmentMetadata = 
_segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null);
-    Preconditions.checkState(segmentMetadata != null, "Failed to find ZK 
metadata for segment: %s", segmentName);
 
     TableConfig tableConfig = 
_segmentManager.getTableConfig(realtimeTableName);
     String factoryName = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to