This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ec20da88 Fix starvation in consumer lock (#15404)
20ec20da88 is described below

commit 20ec20da88cf822705043f38213e435626a33de6
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Mar 28 22:25:40 2025 -0600

    Fix starvation in consumer lock (#15404)
---
 .../core/data/manager/BaseTableDataManager.java    |   5 +
 .../data/manager/realtime/ConsumerCoordinator.java | 290 +++++++++------------
 .../realtime/RealtimeSegmentDataManager.java       |  34 +--
 .../manager/realtime/RealtimeTableDataManager.java |  97 +++----
 .../realtime/SegmentAlreadyConsumedException.java  |  26 --
 .../manager/realtime/ConsumerCoordinatorTest.java  | 148 ++++-------
 .../realtime/RealtimeSegmentDataManagerTest.java   |  55 ----
 7 files changed, 229 insertions(+), 426 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 13edaf56cf..0070c8562f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -344,6 +344,11 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected abstract void doAddOnlineSegment(String segmentName)
       throws Exception;
 
+  @Nullable
+  public SegmentZKMetadata fetchZKMetadataNullable(String segmentName) {
+    return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
_tableNameWithType, segmentName);
+  }
+
   @Override
   public SegmentZKMetadata fetchZKMetadata(String segmentName) {
     SegmentZKMetadata zkMetadata =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
index 8d82a17e9a..6510418598 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -34,9 +34,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,61 +47,63 @@ public class ConsumerCoordinator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerCoordinator.class);
   private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3);
 
-  private final Semaphore _semaphore;
   private final boolean _enforceConsumptionInOrder;
-  private final Condition _condition;
-  private final Lock _lock;
-  private final ServerMetrics _serverMetrics;
-  private final boolean _alwaysRelyOnIdealState;
   private final RealtimeTableDataManager _realtimeTableDataManager;
-  private final AtomicBoolean _firstTransitionProcessed;
+  private final boolean _useIdealStateToCalculatePreviousSegment;
+  private final ServerMetrics _serverMetrics;
+
+  // We use semaphore of 1 permit instead of lock because the semaphore is 
shared across multiple threads, and it can be
+  // released by a different thread than the one that acquired it. There is no 
out-of-box Lock implementation that
+  // allows releasing the lock from a different thread.
+  private final Semaphore _semaphore = new Semaphore(1);
+  private final Lock _lock = new ReentrantLock();
+  private final Condition _condition = _lock.newCondition();
+  private final AtomicBoolean _firstTransitionProcessed = new 
AtomicBoolean(false);
 
-  private volatile int _maxSegmentSeqNumRegistered = -1;
+  private volatile int _maxSequenceNumberRegistered = -1;
 
   public ConsumerCoordinator(boolean enforceConsumptionInOrder, 
RealtimeTableDataManager realtimeTableDataManager) {
-    _semaphore = new Semaphore(1);
-    _lock = new ReentrantLock();
-    _condition = _lock.newCondition();
     _enforceConsumptionInOrder = enforceConsumptionInOrder;
     _realtimeTableDataManager = realtimeTableDataManager;
     StreamIngestionConfig streamIngestionConfig = 
realtimeTableDataManager.getStreamIngestionConfig();
-    if (streamIngestionConfig != null) {
-      // if isUseIdealStateToCalculatePreviousSegment is true, server relies 
on ideal state to fetch previous segment
-      // to a segment for all helix transitions.
-      _alwaysRelyOnIdealState = 
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
-    } else {
-      _alwaysRelyOnIdealState = false;
-    }
-    _firstTransitionProcessed = new AtomicBoolean(false);
+    _useIdealStateToCalculatePreviousSegment =
+        streamIngestionConfig != null && 
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
     _serverMetrics = ServerMetrics.get();
   }
 
   public void acquire(LLCSegmentName llcSegmentName)
-      throws InterruptedException {
+      throws InterruptedException, ShouldNotConsumeException {
+    String segmentName = llcSegmentName.getSegmentName();
     if (_enforceConsumptionInOrder) {
       long startTimeMs = System.currentTimeMillis();
-      waitForPrevSegment(llcSegmentName);
+      SegmentZKMetadata segmentZKMetadata = 
waitForPreviousSegment(llcSegmentName);
       
_serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), 
ServerTimer.PREV_SEGMENT_WAIT_TIME_MS,
           System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
 
-      if (isSegmentAlreadyConsumed(llcSegmentName.getSegmentName())) {
-        // if segment is already consumed, just return from here.
-        // NOTE: if segment is deleted, this segment will never be registered 
and helix thread waiting on
-        // watermark for prev segment won't be notified. All such helix 
threads will fallback to rely on ideal
-        // state for previous segment.
-        throw new 
SegmentAlreadyConsumedException(llcSegmentName.getSegmentName());
+      // When consumption order is enforced, unless the segment is deleted, we 
wait until the previous segment is
+      // registered regardless of whether ZK metadata status has changed to 
guarantee the consumption ordering.
+      //
+      // Prevent the following scenario:
+      // - Seg 100 (OFFLINE -> CONSUMING pending)
+      //
+      // - Seg 101 (OFFLINE -> CONSUMING returned because of status change)
+      // - Seg 101 (CONSUMING -> ONLINE processed)
+      //
+      // - Seg 102 (OFFLINE -> CONSUMING started consuming while 100 is not 
registered)
+      if (segmentZKMetadata != null) {
+        checkSegmentStatus(segmentZKMetadata);
       }
     }
 
     long startTimeMs = System.currentTimeMillis();
     while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
-      String currSegmentName = llcSegmentName.getSegmentName();
-      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", currSegmentName,
+      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", segmentName,
           System.currentTimeMillis() - startTimeMs);
-
-      if (isSegmentAlreadyConsumed(currSegmentName)) {
-        throw new SegmentAlreadyConsumedException(currSegmentName);
+      SegmentZKMetadata segmentZKMetadata = 
_realtimeTableDataManager.fetchZKMetadataNullable(segmentName);
+      if (segmentZKMetadata == null) {
+        throw new ShouldNotConsumeException("Segment: " + segmentName + " is 
deleted");
       }
+      checkSegmentStatus(segmentZKMetadata);
     }
   }
 
@@ -115,177 +116,126 @@ public class ConsumerCoordinator {
     return _semaphore;
   }
 
-  public void trackSegment(LLCSegmentName llcSegmentName) {
+  public void register(LLCSegmentName llcSegmentName) {
     _lock.lock();
     try {
-      if (!_alwaysRelyOnIdealState) {
-        _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered, 
llcSegmentName.getSequenceNumber());
+      int sequenceNumber = llcSegmentName.getSequenceNumber();
+      if (sequenceNumber > _maxSequenceNumberRegistered) {
+        _maxSequenceNumberRegistered = sequenceNumber;
+        // notify all helix threads waiting for their offline -> consuming 
segment's prev segment to be loaded
+        _condition.signalAll();
       }
-      // notify all helix threads waiting for their offline -> consuming 
segment's prev segment to be loaded
-      _condition.signalAll();
     } finally {
       _lock.unlock();
     }
   }
 
-  private void waitForPrevSegment(LLCSegmentName currSegment)
-      throws InterruptedException {
-
-    if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) {
-      // if _alwaysRelyOnIdealState or no offline -> consuming transition has 
been processed, it means rely on
-      // ideal state to fetch previous segment.
-      awaitForPreviousSegmentFromIdealState(currSegment);
-
-      // the first transition will always be prone to error, consider edge 
case where segment previous to current
-      // helix transition's segment was deleted and this server came alive 
after successful deletion. the prev
-      // segment will not exist, hence first transition is handled using 
isFirstTransitionSuccessful.
-      _firstTransitionProcessed.set(true);
-      return;
-    }
-
-    // rely on _maxSegmentSeqNumRegistered watermark for previous segment.
-    if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) {
-      return;
-    }
-
-    // tried using prevSegSeqNumber watermark, but could not acquire the 
previous segment.
-    // fallback to acquire prev segment from ideal state.
-    awaitForPreviousSegmentFromIdealState(currSegment);
-  }
-
-  private void awaitForPreviousSegmentFromIdealState(LLCSegmentName 
currSegment)
-      throws InterruptedException {
-    String previousSegment = getPreviousSegmentFromIdealState(currSegment);
-    if (previousSegment == null) {
-      // previous segment can only be null if either all the previous segments 
are deleted or this is the starting
-      // sequence segment of the partition Group.
-      return;
-    }
-
-    SegmentDataManager segmentDataManager = 
_realtimeTableDataManager.acquireSegment(previousSegment);
-    try {
-      long startTimeMs = System.currentTimeMillis();
-      _lock.lock();
-      try {
-        while (segmentDataManager == null) {
-          // if segmentDataManager == null, it means segment is not loaded in 
the server.
-          // wait until it's loaded.
-          if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
-            LOGGER.warn("Semaphore access denied to segment: {}. Waited on 
previous segment: {} for: {}ms.",
-                currSegment.getSegmentName(), previousSegment, 
System.currentTimeMillis() - startTimeMs);
-
-            // waited until timeout, fetch previous segment again from ideal 
state as previous segment might be
-            // changed in ideal state.
-            previousSegment = getPreviousSegmentFromIdealState(currSegment);
-            if (previousSegment == null) {
-              return;
-            }
-          }
-          segmentDataManager = 
_realtimeTableDataManager.acquireSegment(previousSegment);
-        }
-      } finally {
-        _lock.unlock();
-      }
-    } finally {
-      if (segmentDataManager != null) {
-        _realtimeTableDataManager.releaseSegment(segmentDataManager);
+  /**
+   * Waits for the previous segment to be registered to the server. Returns 
the segment ZK metadata fetched during the
+   * wait to reduce unnecessary ZK read.
+   */
+  @Nullable
+  private SegmentZKMetadata waitForPreviousSegment(LLCSegmentName 
currentSegment)
+      throws InterruptedException, ShouldNotConsumeException {
+    if (!_firstTransitionProcessed.get() || 
_useIdealStateToCalculatePreviousSegment) {
+      SegmentZKMetadata segmentZKMetadata = null;
+      if (_maxSequenceNumberRegistered < currentSegment.getSequenceNumber() - 
1) {
+        int previousSegmentSequenceNumber = 
getPreviousSegmentSequenceNumberFromIdealState(currentSegment);
+        segmentZKMetadata = waitForPreviousSegment(currentSegment, 
previousSegmentSequenceNumber);
       }
+      _firstTransitionProcessed.set(true);
+      return segmentZKMetadata;
+    } else {
+      return waitForPreviousSegment(currentSegment, 
currentSegment.getSequenceNumber() - 1);
     }
   }
 
-  /***
-   * @param currSegment is the segment of current helix transition.
-   * @param timeoutMs is max time to wait in millis
-   * @return true if previous Segment was registered to the server, else false.
-   * @throws InterruptedException
+  /**
+   * Waits for the previous segment with the sequence number to be registered 
to the server. Returns the segment ZK
+   * metadata fetched during the wait to reduce unnecessary ZK read..
    */
+  @Nullable
   @VisibleForTesting
-  boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment, 
long timeoutMs)
-      throws InterruptedException {
+  SegmentZKMetadata waitForPreviousSegment(LLCSegmentName currentSegment, int 
previousSegmentSequenceNumber)
+      throws InterruptedException, ShouldNotConsumeException {
+    if (previousSegmentSequenceNumber <= _maxSequenceNumberRegistered) {
+      return null;
+    }
+    SegmentZKMetadata segmentZKMetadata = null;
     long startTimeMs = System.currentTimeMillis();
-    int prevSeqNum = currSegment.getSequenceNumber() - 1;
     _lock.lock();
     try {
-      while (_maxSegmentSeqNumRegistered < prevSeqNum) {
+      while (previousSegmentSequenceNumber > _maxSequenceNumberRegistered) {
         // it means the previous segment is not loaded in the server. Wait 
until it's loaded.
-        if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) {
-          LOGGER.warn(
-              "Semaphore access denied to segment: {}. Waited on previous 
segment with sequence number: {} for: {}ms.",
-              currSegment.getSegmentName(), prevSeqNum, 
System.currentTimeMillis() - startTimeMs);
-
-          // waited until the timeout. Rely on ideal state now.
-          return _maxSegmentSeqNumRegistered >= prevSeqNum;
+        if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+          String segmentName = currentSegment.getSegmentName();
+          LOGGER.warn("Waited on previous segment with sequence number: {} 
for: {}ms. "
+                  + "Refreshing the previous segment sequence number for 
current segment: {}",
+              previousSegmentSequenceNumber, System.currentTimeMillis() - 
startTimeMs, segmentName);
+          segmentZKMetadata = 
_realtimeTableDataManager.fetchZKMetadataNullable(segmentName);
+          if (segmentZKMetadata == null) {
+            throw new ShouldNotConsumeException("Segment: " + segmentName + " 
is deleted");
+          }
+          previousSegmentSequenceNumber = 
getPreviousSegmentSequenceNumberFromIdealState(currentSegment);
         }
       }
-      return true;
+      return segmentZKMetadata;
     } finally {
       _lock.unlock();
     }
   }
 
   @VisibleForTesting
-  @Nullable
-  String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) {
+  int getPreviousSegmentSequenceNumberFromIdealState(LLCSegmentName 
currentSegment) {
     long startTimeMs = System.currentTimeMillis();
-    // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment 
must be highest seq num of any segment
-    // created before current segment
-    int maxSequenceNumBelowCurrentSegment = -1;
-    String previousSegment = null;
-    int currPartitionGroupId = currSegment.getPartitionGroupId();
-    int currSequenceNum = currSegment.getSequenceNumber();
-    Map<String, Map<String, String>> segmentAssignment = 
getSegmentAssignment();
-    String currentServerInstanceId = 
_realtimeTableDataManager.getServerInstance();
-
-    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
-      String segmentName = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      String state = instanceStateMap.get(currentServerInstanceId);
-
-      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+    // Track the highest sequence number of any segment created before the 
current segment. If there is none, return -1
+    // so that it can always pass the check.
+    int maxSequenceNumberBelowCurrentSegment = -1;
+    String instanceId = _realtimeTableDataManager.getServerInstance();
+    int partitionId = currentSegment.getPartitionGroupId();
+    int currentSequenceNumber = currentSegment.getSequenceNumber();
+
+    for (Map.Entry<String, Map<String, String>> entry : 
getSegmentAssignment().entrySet()) {
+      String state = entry.getValue().get(instanceId);
+      if (!SegmentStateModel.ONLINE.equals(state)) {
         // if server is looking for previous segment to current transition's 
segment, it means the previous segment
         // has to be online in the instance. If all previous segments are not 
online, we just allow the current helix
         // transition to go ahead.
         continue;
       }
 
-      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
       if (llcSegmentName == null) {
         // ignore uploaded segments
         continue;
       }
 
-      if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) {
+      if (llcSegmentName.getPartitionGroupId() != partitionId) {
         // ignore segments of different partitions.
         continue;
       }
 
-      if (llcSegmentName.getSequenceNumber() >= currSequenceNum) {
-        // ignore segments with higher sequence number than existing helix 
transition segment.
-        continue;
-      }
-
-      if (llcSegmentName.getSequenceNumber() > 
maxSequenceNumBelowCurrentSegment) {
-        maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber();
-        // also track the name of segment
-        previousSegment = segmentName;
+      int sequenceNumber = llcSegmentName.getSequenceNumber();
+      if (sequenceNumber > maxSequenceNumberBelowCurrentSegment && 
sequenceNumber < currentSequenceNumber) {
+        maxSequenceNumberBelowCurrentSegment = sequenceNumber;
       }
     }
 
     long timeSpentMs = System.currentTimeMillis() - startTimeMs;
-    LOGGER.info("Fetched previous segment: {} to current segment: {} in: 
{}ms.", previousSegment,
-        currSegment.getSegmentName(), timeSpentMs);
+    LOGGER.info("Fetched previous segment sequence number: {} to current 
segment: {} in: {}ms.",
+        maxSequenceNumberBelowCurrentSegment, currentSegment.getSegmentName(), 
timeSpentMs);
     _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
         ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, 
TimeUnit.MILLISECONDS);
 
-    return previousSegment;
+    return maxSequenceNumberBelowCurrentSegment;
   }
 
   @VisibleForTesting
   Map<String, Map<String, String>> getSegmentAssignment() {
-    IdealState idealState = 
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(),
-        _realtimeTableDataManager.getTableName());
-    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s",
-        _realtimeTableDataManager.getTableName());
+    String realtimeTableName = _realtimeTableDataManager.getTableName();
+    IdealState idealState =
+        
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(), 
realtimeTableName);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s", realtimeTableName);
     return idealState.getRecord().getMapFields();
   }
 
@@ -299,26 +249,32 @@ public class ConsumerCoordinator {
     return _firstTransitionProcessed;
   }
 
-  // this should not be used outside of tests.
   @VisibleForTesting
-  int getMaxSegmentSeqNumLoaded() {
-    return _maxSegmentSeqNumRegistered;
+  int getMaxSequenceNumberRegistered() {
+    return _maxSequenceNumberRegistered;
   }
 
-  @VisibleForTesting
-  boolean isSegmentAlreadyConsumed(String currSegmentName) {
-    SegmentZKMetadata segmentZKMetadata = 
_realtimeTableDataManager.fetchZKMetadata(currSegmentName);
-    if (segmentZKMetadata == null) {
-      // segment is deleted. no need to consume.
-      LOGGER.warn("Skipping consumption for segment: {} because ZK metadata 
does not exists.", currSegmentName);
-      return true;
-    }
+  private static void checkSegmentStatus(SegmentZKMetadata segmentZKMetadata)
+      throws ShouldNotConsumeException {
     if (segmentZKMetadata.getStatus().isCompleted()) {
-      // if segment is done or uploaded, no need to consume.
-      LOGGER.warn("Skipping consumption for segment: {} because ZK status is 
already marked as completed.",
-          currSegmentName);
-      return true;
+      throw new ShouldNotConsumeException(
+          "Segment: " + segmentZKMetadata.getSegmentName() + " is already 
completed with status: "
+              + segmentZKMetadata.getStatus());
+    }
+  }
+
+  /**
+   * This exception is thrown when attempting to acquire the consumer 
semaphore for a segment that should not be
+   * consumed anymore:
+   * - Segment is in completed status (DONE/UPLOADED)
+   * - Segment is deleted
+   *
+   * We allow consumption when segment is COMMITTING (for pauseless 
consumption) because there is no guarantee that the
+   * segment will be committed soon. This way the slow server can still catch 
up.
+   */
+  public static class ShouldNotConsumeException extends Exception {
+    public ShouldNotConsumeException(String message) {
+      super(message);
     }
-    return false;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index a5ac3c2e95..0979782428 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
   private final Schema _schema;
+  private final LLCSegmentName _llcSegmentName;
   private final AtomicBoolean _streamConsumerClosed = new AtomicBoolean(false);
   // Semaphore for each partitionGroupId only, which is to prevent two 
different stream consumers
   // from consuming with the same partitionGroupId in parallel in the same 
host.
@@ -734,6 +735,16 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           } while (!_shouldStop && !_isReadyToConsumeData.getAsBoolean());
         }
 
+        // Acquire semaphore before consuming data
+        try {
+          _consumerCoordinator.acquire(_llcSegmentName);
+        } catch (ConsumerCoordinator.ShouldNotConsumeException e) {
+          _segmentLogger.info("Skipping consumption because: {}", 
e.getMessage());
+          return;
+        }
+        _consumerSemaphoreAcquired.set(true);
+        _consumerCoordinator.register(_llcSegmentName);
+
         // TODO:
         //   When reaching here, the current consuming segment has already 
acquired the consumer semaphore, but there is
         //   no guarantee that the previous consuming segment is already 
persisted (replaced with immutable segment). It
@@ -1068,16 +1079,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _segmentBuildDescriptor;
   }
 
-  @VisibleForTesting
-  Semaphore getPartitionGroupConsumerSemaphore() {
-    return _consumerCoordinator.getSemaphore();
-  }
-
-  @VisibleForTesting
-  AtomicBoolean getConsumerSemaphoreAcquired() {
-    return _consumerSemaphoreAcquired;
-  }
-
   @VisibleForTesting
   protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
     if (_parallelSegmentConsumptionPolicy.isAllowedDuringBuild()) {
@@ -1569,6 +1570,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _schema = schema;
+    _llcSegmentName = llcSegmentName;
+    _consumerCoordinator = consumerCoordinator;
     _serverMetrics = serverMetrics;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
@@ -1603,7 +1606,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             _segmentZKMetadata.getEndOffset() == null ? null
                 : 
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
             _segmentZKMetadata.getStatus().toString());
-    _consumerCoordinator = consumerCoordinator;
     InstanceDataManagerConfig instanceDataManagerConfig = 
indexLoadingConfig.getInstanceDataManagerConfig();
     String clientIdSuffix =
         instanceDataManagerConfig != null ? 
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
@@ -1694,16 +1696,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       throw e;
     }
 
-    // Acquire semaphore to create stream consumers
-    try {
-      _consumerCoordinator.acquire(llcSegmentName);
-      _consumerSemaphoreAcquired.set(true);
-    } catch (InterruptedException e) {
-      String errorMsg = "InterruptedException when acquiring the 
partitionConsumerSemaphore";
-      _segmentLogger.error(errorMsg);
-      throw new RuntimeException(errorMsg + " for segment: " + 
_segmentNameStr);
-    }
-
     try {
       _startOffset = _partitionGroupConsumptionStatus.getStartOffset();
       _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index fdc74f3918..53ced97032 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -99,17 +99,16 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private RealtimeSegmentStatsHistory _statsHistory;
   private final Semaphore _segmentBuildSemaphore;
 
-  // Maintains a map of partition id to semaphore.
+  // Maintains a map from partition id to consumer coordinator. The consumer 
coordinator uses a semaphore to ensure that
+  // exactly one PartitionConsumer instance consumes from any stream partition.
+  // In some streams, it's possible that having multiple consumers (with the 
same consumer name on the same host)
+  // consuming from the same stream partition can lead to bugs.
   // We use semaphore of 1 permit instead of lock because the semaphore is 
shared across multiple threads, and it can be
   // released by a different thread than the one that acquired it. There is no 
out-of-box Lock implementation that
   // allows releasing the lock from a different thread.
-  // The semaphore ensures that exactly one PartitionConsumer instance 
consumes from any stream partition.
-  // In some streams, it's possible that having multiple consumers (with the 
same consumer name on the same host)
-  // consuming from the same stream partition can lead to bugs.
-  // The semaphores will stay in the hash map even if the consuming partitions 
move to a different host.
-  // We expect that there will be a small number of semaphores, but that may 
be ok.
-  private final Map<Integer, ConsumerCoordinator> 
_partitionGroupIdToConsumerCoordinatorMap =
-      new ConcurrentHashMap<>();
+  // The consumer coordinators will stay in the map even if the consuming 
partitions moved to a different server. We
+  // expect a small number of consumer coordinators, so it should be fine to 
not remove them.
+  private final Map<Integer, ConsumerCoordinator> 
_partitionIdToConsumerCoordinatorMap = new ConcurrentHashMap<>();
   // The old name of the stats file used to be stats.ser which we changed when 
we moved all packages
   // from com.linkedin to org.apache because of not being able to deserialize 
the old files using the newer classes
   private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -474,23 +473,26 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager == null) {
       addNewOnlineSegment(zkMetadata, indexLoadingConfig);
-      return;
-    }
-    if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+    } else if (segmentDataManager instanceof RealtimeSegmentDataManager) {
       _logger.info("Changing segment: {} from CONSUMING to ONLINE", 
segmentName);
       ((RealtimeSegmentDataManager) 
segmentDataManager).goOnlineFromConsuming(zkMetadata);
       onConsumingToOnline(segmentName);
-      return;
-    }
-    // For pauseless ingestion, the segment is marked ONLINE before it's built 
and before the COMMIT_END_METADATA
-    // call completes.
-    // The server should replace the segment only after the CRC is set by 
COMMIT_END_METADATA and the segment is
-    // marked DONE.
-    // This ensures the segment's download URL is available before discarding 
the locally built copy, preventing
-    // data loss if COMMIT_END_METADATA fails.
-    if (zkMetadata.getStatus() == Status.DONE) {
+    } else if (zkMetadata.getStatus() == Status.DONE) {
+      // For pauseless ingestion, the segment is marked ONLINE before it's 
built and before the COMMIT_END_METADATA
+      // call completes.
+      // The server should replace the segment only after the CRC is set by 
COMMIT_END_METADATA and the segment is
+      // marked DONE.
+      // This ensures the segment's download URL is available before 
discarding the locally built copy, preventing
+      // data loss if COMMIT_END_METADATA fails.
       replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, 
indexLoadingConfig);
     }
+    // Register the segment into the consumer coordinator if consumption order 
is enforced.
+    if (_enforceConsumptionInOrder) {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName != null) {
+        
getConsumerCoordinator(llcSegmentName.getPartitionGroupId()).register(llcSegmentName);
+      }
+    }
   }
 
   @Override
@@ -516,11 +518,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private void doAddConsumingSegment(String segmentName)
       throws AttemptsExceededException, RetriableOperationException {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
-    if ((!_enforceConsumptionInOrder) && ((zkMetadata == null) || 
(zkMetadata.getStatus().isCompleted()))) {
-      // NOTE: We do not throw exception here because the segment might have 
just been committed before the state
-      //       transition is processed. We can skip adding this segment, and 
the segment will enter CONSUMING state in
-      //       Helix, then we can rely on the following CONSUMING -> ONLINE 
state transition to add it.
-      _logger.warn("Segment: {} is already consumed, skipping adding it as 
CONSUMING segment", segmentName);
+    if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
+      // NOTE:
+      // 1. When consumption order is enforced, we always create the 
RealtimeSegmentDataManager to coordinate the
+      //    consumption.
+      // 2. When segment is COMMITTING (for pauseless consumption), we still 
create the RealtimeSegmentDataManager
+      //    because there is no guarantee that the segment will be committed 
soon. This way the slow server can still
+      //    catch up.
+      // 3. We do not throw exception here because the segment might have just 
been committed before the state
+      //    transition is processed. We can skip adding this segment, and the 
segment will enter CONSUMING state in
+      //    Helix, then we can rely on the following CONSUMING -> ONLINE state 
transition to add it.
+      _logger.warn("Segment: {} is already completed, skipping adding it as 
CONSUMING segment", segmentName);
       return;
     }
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
@@ -557,22 +565,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
             : null;
-    RealtimeSegmentDataManager realtimeSegmentDataManager;
-    try {
-      realtimeSegmentDataManager =
-          createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName,
-              consumerCoordinator, partitionUpsertMetadataManager, 
partitionDedupMetadataManager,
-              _isTableReadyToConsumeData);
-    } catch (SegmentAlreadyConsumedException e) {
-      // Don't register segment.
-      // If segment is not deleted, Eventually this server should receive a 
CONSUMING -> ONLINE helix state transition.
-      // If consumption in order is enforced:
-      // 1. If segment was deleted: Helix thread waiting on this deleted 
segment will fallback to fetch prev segment
-      //    from ideal state.
-      // 2. If segment is not deleted, Helix thread waiting on this segment 
will be notified and unblocked during
-      //    consuming -> online transition of this segment.
-      return;
-    }
+    RealtimeSegmentDataManager realtimeSegmentDataManager =
+        createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName,
+            consumerCoordinator, partitionUpsertMetadataManager, 
partitionDedupMetadataManager,
+            _isTableReadyToConsumeData);
     registerSegment(segmentName, realtimeSegmentDataManager, 
partitionUpsertMetadataManager);
     if (partitionUpsertMetadataManager != null) {
       partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
@@ -821,21 +817,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     registerSegment(segmentName, segmentDataManager);
   }
 
-  @Override
-  protected SegmentDataManager registerSegment(String segmentName, 
SegmentDataManager segmentDataManager) {
-    SegmentDataManager oldSegmentDataManager = 
super.registerSegment(segmentName, segmentDataManager);
-    if (_enforceConsumptionInOrder) {
-      // helix threads might be waiting for their respective previous segments 
to be loaded.
-      // they need to be notified here.
-      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
-      if (llcSegmentName != null) {
-        ConsumerCoordinator consumerCoordinator = 
getConsumerCoordinator(llcSegmentName.getPartitionGroupId());
-        consumerCoordinator.trackSegment(llcSegmentName);
-      }
-    }
-    return oldSegmentDataManager;
-  }
-
   /**
    * Replaces the CONSUMING segment with a downloaded committed one.
    */
@@ -892,8 +873,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   @VisibleForTesting
-  ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) {
-    return 
_partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId,
+  ConsumerCoordinator getConsumerCoordinator(int partitionId) {
+    return _partitionIdToConsumerCoordinatorMap.computeIfAbsent(partitionId,
         k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this));
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
deleted file mode 100644
index 6b04920dff..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentAlreadyConsumedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-public class SegmentAlreadyConsumedException extends RuntimeException {
-
-  public SegmentAlreadyConsumedException(String currSegmentName) {
-    super("Skipping consumption for segment: " + currSegmentName);
-  }
-}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
index 1d8ec5bc0a..775ecffc3e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
@@ -25,16 +25,15 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 
+// TODO: Replace the sleep in this test to condition wait
 public class ConsumerCoordinatorTest {
 
   private static class FakeRealtimeTableDataManager extends 
RealtimeTableDataManager {
@@ -98,15 +97,10 @@ public class ConsumerCoordinatorTest {
     public Map<String, Map<String, String>> getSegmentAssignment() {
       return _segmentAssignmentMap;
     }
-
-    @Override
-    public boolean isSegmentAlreadyConsumed(String currSegmentName) {
-      return false;
-    }
   }
 
   @Test
-  public void testAwaitForPreviousSegmentSequenceNumber()
+  public void testWaitForPreviousSegment()
       throws InterruptedException {
     // 1. enable tracking segment seq num.
     FakeRealtimeTableDataManager realtimeTableDataManager = new 
FakeRealtimeTableDataManager(null, false);
@@ -117,40 +111,24 @@ public class ConsumerCoordinatorTest {
     // 2. check if thread waits on prev segment seq
     AtomicBoolean atomicBoolean = new AtomicBoolean(false);
     Thread thread1 = new Thread(() -> {
-      LLCSegmentName llcSegmentName = getLLCSegment(101);
+      LLCSegmentName currentSegment = getLLCSegment(101);
       try {
-        boolean b = 
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000);
-        atomicBoolean.set(b);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+        consumerCoordinator.waitForPreviousSegment(currentSegment, 100);
+        atomicBoolean.set(true);
+      } catch (Exception e) {
+        Assert.fail();
       }
     });
     thread1.start();
+    Thread.sleep(1000);
+    Assert.assertFalse(atomicBoolean.get());
 
     // 3. add prev segment and check if thread is unblocked.
-    consumerCoordinator.trackSegment(getLLCSegment(100));
+    consumerCoordinator.register(getLLCSegment(100));
 
-    TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000,
+    TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 5000,
         "Thread waiting on previous segment should have been unblocked.");
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
-
-    // 4. check if second thread waits on prev segment seq until timeout and 
returns false
-    AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
-    Thread thread2 = new Thread(() -> {
-      LLCSegmentName llcSegmentName = getLLCSegment(102);
-      try {
-        boolean b = 
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500);
-        atomicBoolean2.set(b);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    });
-    thread2.start();
-
-    Thread.sleep(1500);
-
-    Assert.assertFalse(atomicBoolean2.get());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
100);
   }
 
   @Test
@@ -185,15 +163,15 @@ public class ConsumerCoordinatorTest {
     Thread.sleep(1000);
 
     // 3. load segment 100, 101, 102
-    realtimeTableDataManager.registerSegment(getSegmentName(100), 
mockedRealtimeSegmentDataManager);
-    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
-    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(100));
+    consumerCoordinator.register(getLLCSegment(101));
+    consumerCoordinator.register(getLLCSegment(102));
     Thread.sleep(1000);
 
     // 4. check all of the above threads wait
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
102);
     
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
 
     thread2.start();
@@ -205,27 +183,27 @@ public class ConsumerCoordinatorTest {
     // 5. check that first thread acquiring semaphore is of segment 104
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
102);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
 
-    realtimeTableDataManager.registerSegment(getSegmentName(104), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(104));
     Thread.sleep(1000);
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
104);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
 
     // 6. check the next threads acquiring semaphore is 106
     consumerCoordinator.getSemaphore().release();
-    realtimeTableDataManager.registerSegment(getSegmentName(106), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(106));
 
     Thread.sleep(1000);
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
106);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
   }
@@ -249,26 +227,24 @@ public class ConsumerCoordinatorTest {
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
-1);
     
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
 
-    RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = 
getMockedRealtimeSegmentDataManager();
-
     // 3. register older segment and check seq num watermark and semaphore.
-    realtimeTableDataManager.registerSegment(getSegmentName(90), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(90));
     Thread.sleep(1000);
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
90);
     
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
 
     // 4. register prev segment and check watermark and if thread was unblocked
-    realtimeTableDataManager.registerSegment(getSegmentName(91), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(91));
     Thread.sleep(1000);
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
91);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
 
     // 5. check that all the following transitions rely on seq num watermark 
and gets blocked.
@@ -283,7 +259,7 @@ public class ConsumerCoordinatorTest {
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
91);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
 
     // 6. check that all above threads are still blocked even if semaphore is 
released.
@@ -293,25 +269,25 @@ public class ConsumerCoordinatorTest {
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
91);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
 
     // 6. mark 101 seg as complete. Check 102 acquired the semaphore.
-    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(101));
 
     Thread.sleep(1000);
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
101);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
 
     // 7. register 102 seg, check if seg 103 is waiting on semaphore.
-    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(102));
 
     Thread.sleep(1000);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
102);
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
@@ -320,16 +296,16 @@ public class ConsumerCoordinatorTest {
     consumerCoordinator.getSemaphore().release();
     Thread.sleep(1000);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
102);
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
 
     // 8. register 103 seg and check if seg 104 is now queued on semaphore
-    realtimeTableDataManager.registerSegment(getSegmentName(103), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(103));
     Thread.sleep(1000);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
103);
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
   }
@@ -354,14 +330,14 @@ public class ConsumerCoordinatorTest {
     Assert.assertNotNull(realtimeTableDataManager);
 
     // prev segment has seq 91, so registering seq 90 won't do anything.
-    realtimeTableDataManager.registerSegment(getSegmentName(90), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(90));
 
     Thread.sleep(2000);
 
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
 
     // 2. test that registering prev segment will unblock thread.
-    realtimeTableDataManager.registerSegment(getSegmentName(91), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(91));
 
     TestUtils.waitForCondition(aVoid -> 
(consumerCoordinator.getSemaphore().availablePermits() == 0), 5000,
         "Semaphore must be acquired after registering previous segment");
@@ -372,8 +348,8 @@ public class ConsumerCoordinatorTest {
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
     Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
-    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
91);
+    consumerCoordinator.register(getLLCSegment(101));
 
     // 3. test that segment 103 will be blocked.
     Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
@@ -401,7 +377,7 @@ public class ConsumerCoordinatorTest {
     Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
 
     // 4. registering seg 102 should unblock seg 103
-    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+    consumerCoordinator.register(getLLCSegment(102));
 
     Thread.sleep(1000);
 
@@ -416,12 +392,12 @@ public class ConsumerCoordinatorTest {
     Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
     Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
     Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
-    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+    Assert.assertEquals(consumerCoordinator.getMaxSequenceNumberRegistered(), 
102);
   }
 
   @Test
   public void testRandomOrder()
-      throws InterruptedException {
+      throws Exception {
     RealtimeTableDataManager realtimeTableDataManager = 
Mockito.mock(RealtimeTableDataManager.class);
     
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
 
@@ -450,51 +426,25 @@ public class ConsumerCoordinatorTest {
     String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
     LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
     Assert.assertNotNull(llcSegmentName);
-    String previousSegment = 
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
-    Assert.assertEquals(previousSegment, 
"tableTest_REALTIME__1__91__20250304T0035Z");
+    int previousSegmentSequenceNumber =
+        
consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName);
+    Assert.assertEquals(previousSegmentSequenceNumber, 91);
 
     consumerCoordinator.getSegmentAssignment().clear();
     Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
       put("server_3", "ONLINE");
     }};
     consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), 
serverSegmentStatusMap);
-    previousSegment = 
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
-    Assert.assertNull(previousSegment);
-  }
-
-  @Test
-  public void testIfSegmentIsConsumed() {
-    RealtimeTableDataManager realtimeTableDataManager = 
Mockito.mock(RealtimeTableDataManager.class);
-    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(null);
-
-    ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(true, 
realtimeTableDataManager);
-    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-    SegmentZKMetadata mockSegmentZKMetadata = 
Mockito.mock(SegmentZKMetadata.class);
-
-    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
-    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-    
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.COMMITTING);
-    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-    
Assert.assertFalse(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.DONE);
-    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
-
-    
Mockito.when(mockSegmentZKMetadata.getStatus()).thenReturn(CommonConstants.Segment.Realtime.Status.UPLOADED);
-    
Mockito.when(realtimeTableDataManager.fetchZKMetadata(getSegmentName(101))).thenReturn(mockSegmentZKMetadata);
-    
Assert.assertTrue(consumerCoordinator.isSegmentAlreadyConsumed(getSegmentName(101)));
+    previousSegmentSequenceNumber = 
consumerCoordinator.getPreviousSegmentSequenceNumberFromIdealState(llcSegmentName);
+    Assert.assertEquals(previousSegmentSequenceNumber, -1);
   }
 
   private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, 
LLCSegmentName llcSegmentName) {
     return new Thread(() -> {
       try {
         consumerCoordinator.acquire(llcSegmentName);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+      } catch (Exception e) {
+        Assert.fail();
       }
     }, String.valueOf(llcSegmentName.getSequenceNumber()));
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index a332818164..8bf32c576b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -28,10 +28,8 @@ import java.time.Instant;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
@@ -70,7 +68,6 @@ import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -750,58 +747,6 @@ public class RealtimeSegmentDataManagerTest {
     segmentDataManager.close();
   }
 
-  @Test
-  public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition()
-      throws Exception {
-    long timeout = 10_000L;
-    FakeRealtimeSegmentDataManager firstSegmentDataManager = 
createFakeSegmentManager();
-    
Assert.assertTrue(firstSegmentDataManager.getConsumerSemaphoreAcquired().get());
-    Semaphore firstSemaphore = 
firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
-    Assert.assertEquals(firstSemaphore.availablePermits(), 0);
-    Assert.assertFalse(firstSemaphore.hasQueuedThreads());
-
-    AtomicReference<FakeRealtimeSegmentDataManager> secondSegmentDataManager = 
new AtomicReference<>(null);
-
-    // Construct the second segment manager, which will be blocked on the 
semaphore.
-    Thread constructSecondSegmentManager = new Thread(() -> {
-      try {
-        secondSegmentDataManager.set(createFakeSegmentManager());
-      } catch (Exception e) {
-        throw new RuntimeException("Exception when sleeping for " + timeout + 
"ms", e);
-      }
-    });
-    constructSecondSegmentManager.start();
-
-    // Wait until the second segment manager gets blocked on the semaphore.
-    TestUtils.waitForCondition(aVoid -> {
-      if (firstSemaphore.hasQueuedThreads()) {
-        // Once verified the second segment gets blocked, release the 
semaphore.
-        firstSegmentDataManager.close();
-        return true;
-      } else {
-        return false;
-      }
-    }, timeout, "Failed to wait for the second segment blocked on semaphore");
-
-    // Wait for the second segment manager finished the construction.
-    TestUtils.waitForCondition(aVoid -> secondSegmentDataManager.get() != 
null, timeout,
-        "Failed to acquire the semaphore for the second segment manager in " + 
timeout + "ms");
-
-    
Assert.assertTrue(secondSegmentDataManager.get().getConsumerSemaphoreAcquired().get());
-    Semaphore secondSemaphore = 
secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
-    Assert.assertEquals(firstSemaphore, secondSemaphore);
-    Assert.assertEquals(secondSemaphore.availablePermits(), 0);
-    Assert.assertFalse(secondSemaphore.hasQueuedThreads());
-
-    // Call offload method the 2nd time on the first segment manager, the 
permits in semaphore won't increase.
-    firstSegmentDataManager.close();
-    
Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(),
 0);
-
-    // The permit finally gets released in the Semaphore.
-    secondSegmentDataManager.get().close();
-    
Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(),
 1);
-  }
-
   @Test
   public void 
testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to