This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4154559944a Fix rebalance logic to treat COMMITTING segments as CONSUMING (#16348) 4154559944a is described below commit 4154559944ac29e042900a6a622ea0f04b574d8f Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Jul 16 15:08:43 2025 -0600 Fix rebalance logic to treat COMMITTING segments as CONSUMING (#16348) --- .../segment/RealtimeSegmentAssignment.java | 14 +++- .../assignment/segment/SegmentAssignmentUtils.java | 49 +++++++++---- .../realtime/PinotLLCRealtimeSegmentManager.java | 82 +++++++++++++--------- .../core/realtime/SegmentCompletionManager.java | 1 - 4 files changed, 97 insertions(+), 49 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 4909c4b1169..80ed9cdc292 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -20,17 +20,21 @@ package org.apache.pinot.controller.helix.core.assignment.segment; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.tier.Tier; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.SegmentUtils; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy; import org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory; +import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; @@ -192,8 +196,16 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment { + "includeConsuming: {}, bootstrap: {}", _tableNameWithType, completedInstancePartitions, consumingInstancePartitions, includeConsuming, bootstrap); + Set<String> committingSegments = null; + if (PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)) { + List<String> committingSegmentList = PinotLLCRealtimeSegmentManager.getCommittingSegments(_tableNameWithType, + _helixManager.getHelixPropertyStore()); + if (!committingSegmentList.isEmpty()) { + committingSegments = new HashSet<>(committingSegmentList); + } + } SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = - new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment); + new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment, committingSegments); Map<String, Map<String, String>> newAssignment; // Reassign COMPLETED segments first diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 027f8defba9..68ca22fbda4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -37,6 +39,7 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.Pairs; @@ -387,19 +390,38 @@ public class SegmentAssignmentUtils { private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>(); // NOTE: split the segments based on the following criteria: - // 1. At least one instance ONLINE -> COMPLETED segment - // 2. At least one instance CONSUMING -> CONSUMING segment + // 1. At least one instance ONLINE && segment is not COMMITTING -> COMPLETED segment + // 2. At least one instance CONSUMING || segment is COMMITTING -> CONSUMING segment // 3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment - CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) { - for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { - _completedSegmentAssignment.put(segmentName, instanceStateMap); - } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { - _consumingSegmentAssignment.put(segmentName, instanceStateMap); - } else { - _offlineSegmentAssignment.put(segmentName, instanceStateMap); + CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment, + @Nullable Set<String> committingSegments) { + if (CollectionUtils.isEmpty(committingSegments)) { + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { + _completedSegmentAssignment.put(segmentName, instanceStateMap); + } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { + _consumingSegmentAssignment.put(segmentName, instanceStateMap); + } else { + _offlineSegmentAssignment.put(segmentName, instanceStateMap); + } + } + } else { + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { + if (committingSegments.contains(segmentName)) { + _consumingSegmentAssignment.put(segmentName, instanceStateMap); + } else { + _completedSegmentAssignment.put(segmentName, instanceStateMap); + } + } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { + _consumingSegmentAssignment.put(segmentName, instanceStateMap); + } else { + _offlineSegmentAssignment.put(segmentName, instanceStateMap); + } } } } @@ -452,7 +474,8 @@ public class SegmentAssignmentUtils { // find an eligible tier for the segment, from the ordered list of tiers SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(propertyStore, tableNameWithType, segmentName); - if (segmentZKMetadata != null) { + // Skip COMMITTING segments + if (segmentZKMetadata != null && segmentZKMetadata.getStatus() != Status.COMMITTING) { for (Tier tier : sortedTiers) { if (tier.getSegmentSelector().selectSegment(tableNameWithType, segmentZKMetadata)) { _tierNameToSegmentAssignmentMap.get(tier.getName()).put(segmentName, instanceStateMap); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 5f7c9501785..57cff126e40 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; @@ -484,10 +485,9 @@ public class PinotLLCRealtimeSegmentManager { Stat stat = new Stat(); ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); int expectedVersion = stat.getVersion(); - LOGGER.info("Committing segments list size: {} before adding the segment: {}", Optional.ofNullable(znRecord) - .map(record -> record.getListField(COMMITTING_SEGMENTS)) - .map(List::size) - .orElse(0), segmentName); + LOGGER.info("Committing segments list size: {} before adding the segment: {}", + Optional.ofNullable(znRecord).map(record -> record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0), + segmentName); // empty ZN record for the table if (znRecord == null) { @@ -521,10 +521,9 @@ public class PinotLLCRealtimeSegmentManager { Stat stat = new Stat(); ZNRecord znRecord = _propertyStore.get(committingSegmentsListPath, stat, AccessOption.PERSISTENT); - LOGGER.info("Committing segments list size: {} before removing the segment: {}", Optional.ofNullable(znRecord) - .map(record -> record.getListField(COMMITTING_SEGMENTS)) - .map(List::size) - .orElse(0), segmentName); + LOGGER.info("Committing segments list size: {} before removing the segment: {}", + Optional.ofNullable(znRecord).map(record -> record.getListField(COMMITTING_SEGMENTS)).map(List::size).orElse(0), + segmentName); if (znRecord == null || znRecord.getListField(COMMITTING_SEGMENTS) == null || !znRecord.getListField( COMMITTING_SEGMENTS).contains(segmentName)) { @@ -806,8 +805,7 @@ public class PinotLLCRealtimeSegmentManager { } else { LOGGER.info( "Skipping creation of new segment metadata after segment: {} during commit. Reason: Partition ID: {} not " - + "found in upstream metadata.", - committingSegmentName, committingSegmentPartitionGroupId); + + "found in upstream metadata.", committingSegmentName, committingSegmentPartitionGroupId); } } else { LOGGER.info( @@ -1186,12 +1184,10 @@ public class PinotLLCRealtimeSegmentManager { prevSegmentZKMetadata.setSizeThresholdToFlushSegment(newNumRows); persistSegmentZKMetadata(realtimeTableName, prevSegmentZKMetadata, stat.getVersion()); - _helixResourceManager.resetSegment( - realtimeTableName, segmentName, null); - LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {} to {}", - segmentName, prevTargetNumRows, prevNumRows, newNumRows); - _controllerMetrics.addMeteredTableValue( - realtimeTableName, ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L); + _helixResourceManager.resetSegment(realtimeTableName, segmentName, null); + LOGGER.info("Reduced segment size of {} from prevTarget {} prevActual {} to {}", segmentName, prevTargetNumRows, + prevNumRows, newNumRows); + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_SIZE_AUTO_REDUCTION, 1L); } /** @@ -1272,8 +1268,8 @@ public class PinotLLCRealtimeSegmentManager { boolean offsetsHaveToChange = offsetCriteria != null; if (isTableEnabled && !isTablePaused) { List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + offsetsHaveToChange ? Collections.emptyList() + // offsets from metadata are not valid anymore; fetch for all partitions : getPartitionGroupConsumptionStatusList(idealState, streamConfigs); // FIXME: Right now, we assume topics are sharing same offset criteria OffsetCriteria originalOffsetCriteria = streamConfigs.get(0).getOffsetCriteria(); @@ -1628,8 +1624,8 @@ public class PinotLLCRealtimeSegmentManager { // Do not create new CONSUMING segment when the stream partition has reached end of life. if (!partitionIdToSmallestOffset.containsKey(partitionId)) { - LOGGER.info("PartitionGroup: {} has reached end of life. Skipping creation of new segment {}", - partitionId, latestSegmentName); + LOGGER.info("PartitionGroup: {} has reached end of life. Skipping creation of new segment {}", partitionId, + latestSegmentName); continue; } @@ -1722,8 +1718,8 @@ public class PinotLLCRealtimeSegmentManager { instancePartitionsMap); } - private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset( - List<StreamConfig> streamConfigs, IdealState idealState) { + private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs, + IdealState idealState) { Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); for (StreamConfig streamConfig : streamConfigs) { List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = @@ -2496,10 +2492,8 @@ public class PinotLLCRealtimeSegmentManager { } } - if (segmentsInErrorStateInAtLeastOneReplica.isEmpty()) { - _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, - ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0); + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 0); _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 0); return; @@ -2518,12 +2512,12 @@ public class PinotLLCRealtimeSegmentManager { return; } else { LOGGER.info("Repairing error segments in table: {}.", realtimeTableName); - _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, - ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); + _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, + segmentsInErrorStateInAllReplicas.size()); } for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) { - SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName); + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segmentName); if (segmentZKMetadata == null) { LOGGER.warn("Segment metadata not found for segment: {} in table: {}, skipping repairing it", segmentName, realtimeTableName); @@ -2704,6 +2698,20 @@ public class PinotLLCRealtimeSegmentManager { }); } + public List<String> getCommittingSegments(String realtimeTableName) { + return getCommittingSegments(realtimeTableName, _propertyStore, _helixResourceManager::getSegmentZKMetadata); + } + + private List<String> getCommittingSegments(String realtimeTableName, Collection<String> segmentsToCheck) { + return getCommittingSegments(realtimeTableName, segmentsToCheck, _helixResourceManager::getSegmentZKMetadata); + } + + public static List<String> getCommittingSegments(String realtimeTableName, + ZkHelixPropertyStore<ZNRecord> propertyStore) { + return getCommittingSegments(realtimeTableName, propertyStore, + (t, s) -> ZKMetadataProvider.getSegmentZKMetadata(propertyStore, t, s)); + } + /** * Retrieves and filters the list of committing segments for a realtime table from the property store. * This method: @@ -2714,27 +2722,33 @@ public class PinotLLCRealtimeSegmentManager { * @param realtimeTableName The name of the realtime table to fetch committing segments for * @return Filtered list of committing segments */ - public List<String> getCommittingSegments(String realtimeTableName) { + private static List<String> getCommittingSegments(String realtimeTableName, + ZkHelixPropertyStore<ZNRecord> propertyStore, BiFunction<String, String, SegmentZKMetadata> zkMetadataProvider) { String pauselessDebugMetadataPath = ZKMetadataProvider.constructPropertyStorePathForPauselessDebugMetadata(realtimeTableName); - ZNRecord znRecord = _propertyStore.get(pauselessDebugMetadataPath, null, AccessOption.PERSISTENT); + ZNRecord znRecord = propertyStore.get(pauselessDebugMetadataPath, null, AccessOption.PERSISTENT); if (znRecord == null) { return List.of(); } - return getCommittingSegments(realtimeTableName, znRecord.getListField(COMMITTING_SEGMENTS)); + List<String> committingSegments = znRecord.getListField(COMMITTING_SEGMENTS); + if (committingSegments == null) { + return List.of(); + } + return getCommittingSegments(realtimeTableName, committingSegments, zkMetadataProvider); } /** * Returns the list of segments that are in COMMITTING state. Filters out segments that are either deleted or no * longer in COMMITTING state. */ - private List<String> getCommittingSegments(String realtimeTableName, @Nullable Collection<String> segmentsToCheck) { - if (CollectionUtils.isEmpty(segmentsToCheck)) { + private static List<String> getCommittingSegments(String realtimeTableName, Collection<String> segmentsToCheck, + BiFunction<String, String, SegmentZKMetadata> zkMetadataProvider) { + if (segmentsToCheck.isEmpty()) { return List.of(); } List<String> committingSegments = new ArrayList<>(segmentsToCheck.size()); for (String segment : segmentsToCheck) { - SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(realtimeTableName, segment); + SegmentZKMetadata segmentZKMetadata = zkMetadataProvider.apply(realtimeTableName, segment); if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == Status.COMMITTING) { committingSegments.add(segment); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 7607f2114a0..520daaeea3b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -127,7 +127,6 @@ public class SegmentCompletionManager { String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName()); String segmentName = llcSegmentName.getSegmentName(); SegmentZKMetadata segmentMetadata = _segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName, null); - Preconditions.checkState(segmentMetadata != null, "Failed to find ZK metadata for segment: %s", segmentName); TableConfig tableConfig = _segmentManager.getTableConfig(realtimeTableName); String factoryName = null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org