This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf74f5ad6d Consumes segments in strict order of sequence number 
(#15261)
cf74f5ad6d is described below

commit cf74f5ad6dde98157f534e7781cbb177889cddac
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Thu Mar 27 07:12:08 2025 +0530

    Consumes segments in strict order of sequence number (#15261)
---
 .../apache/pinot/common/metrics/ServerTimer.java   |   6 +
 .../data/manager/realtime/ConsumerCoordinator.java | 291 +++++++++++++
 .../realtime/RealtimeSegmentDataManager.java       |  22 +-
 .../manager/realtime/RealtimeTableDataManager.java |  61 ++-
 .../manager/realtime/ConsumerCoordinatorTest.java  | 485 +++++++++++++++++++++
 .../realtime/RealtimeSegmentDataManagerTest.java   |  19 +-
 ...FailureInjectingRealtimeSegmentDataManager.java |  19 +-
 .../FailureInjectingRealtimeTableDataManager.java  |   5 +-
 .../table/ingestion/StreamIngestionConfig.java     |  27 +-
 9 files changed, 891 insertions(+), 44 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 1618f73047..917a187946 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -63,6 +63,12 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
       "Time spent waiting in the secondary queue when BinaryWorkloadScheduler 
is used."),
 
+  PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS("milliseconds", false,
+      "Time spent while fetching previous segment from ideal state for any 
segment."),
+
+  PREV_SEGMENT_WAIT_TIME_MS("milliseconds", false,
+      "Time spent while waiting on previous segment to be registered."),
+
   // Multi-stage
   /**
    * Time spent building the hash table for the join.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
new file mode 100644
index 0000000000..75ba5f73d4
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ConsumerCoordinator coordinates the offline->consuming helix 
transitions.
+ */
+public class ConsumerCoordinator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerCoordinator.class);
+  private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3);
+
+  private final Semaphore _semaphore;
+  private final boolean _enforceConsumptionInOrder;
+  private final Condition _condition;
+  private final Lock _lock;
+  private final ServerMetrics _serverMetrics;
+  private final boolean _alwaysRelyOnIdealState;
+  private final RealtimeTableDataManager _realtimeTableDataManager;
+  private final AtomicBoolean _firstTransitionProcessed;
+
+  private volatile int _maxSegmentSeqNumRegistered = -1;
+
+  public ConsumerCoordinator(boolean enforceConsumptionInOrder, 
RealtimeTableDataManager realtimeTableDataManager) {
+    _semaphore = new Semaphore(1);
+    _lock = new ReentrantLock();
+    _condition = _lock.newCondition();
+    _enforceConsumptionInOrder = enforceConsumptionInOrder;
+    _realtimeTableDataManager = realtimeTableDataManager;
+    StreamIngestionConfig streamIngestionConfig = 
realtimeTableDataManager.getStreamIngestionConfig();
+    if (streamIngestionConfig != null) {
+      // if isUseIdealStateToCalculatePreviousSegment is true, server relies 
on ideal state to fetch previous segment
+      // to a segment for all helix transitions.
+      _alwaysRelyOnIdealState = 
streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
+    } else {
+      _alwaysRelyOnIdealState = false;
+    }
+    _firstTransitionProcessed = new AtomicBoolean(false);
+    _serverMetrics = ServerMetrics.get();
+  }
+
+  public void acquire(LLCSegmentName llcSegmentName)
+      throws InterruptedException {
+    if (_enforceConsumptionInOrder) {
+      long startTimeMs = System.currentTimeMillis();
+      waitForPrevSegment(llcSegmentName);
+      
_serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(), 
ServerTimer.PREV_SEGMENT_WAIT_TIME_MS,
+          System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
+    }
+
+    long startTimeMs = System.currentTimeMillis();
+    while (!_semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+      LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: 
{}ms. Retrying.", llcSegmentName,
+          System.currentTimeMillis() - startTimeMs);
+    }
+  }
+
+  public void release() {
+    _semaphore.release();
+  }
+
+  @VisibleForTesting
+  Semaphore getSemaphore() {
+    return _semaphore;
+  }
+
+  public void trackSegment(LLCSegmentName llcSegmentName) {
+    _lock.lock();
+    try {
+      if (!_alwaysRelyOnIdealState) {
+        _maxSegmentSeqNumRegistered = Math.max(_maxSegmentSeqNumRegistered, 
llcSegmentName.getSequenceNumber());
+      }
+      // notify all helix threads waiting for their offline -> consuming 
segment's prev segment to be loaded
+      _condition.signalAll();
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  private void waitForPrevSegment(LLCSegmentName currSegment)
+      throws InterruptedException {
+
+    if (_alwaysRelyOnIdealState || !_firstTransitionProcessed.get()) {
+      // if _alwaysRelyOnIdealState or no offline -> consuming transition has 
been processed, it means rely on
+      // ideal state to fetch previous segment.
+      awaitForPreviousSegmentFromIdealState(currSegment);
+
+      // the first transition will always be prone to error, consider edge 
case where segment previous to current
+      // helix transition's segment was deleted and this server came alive 
after successful deletion. the prev
+      // segment will not exist, hence first transition is handled using 
isFirstTransitionSuccessful.
+      _firstTransitionProcessed.set(true);
+      return;
+    }
+
+    // rely on _maxSegmentSeqNumRegistered watermark for previous segment.
+    if (awaitForPreviousSegmentSequenceNumber(currSegment, WAIT_INTERVAL_MS)) {
+      return;
+    }
+
+    // tried using prevSegSeqNumber watermark, but could not acquire the 
previous segment.
+    // fallback to acquire prev segment from ideal state.
+    awaitForPreviousSegmentFromIdealState(currSegment);
+  }
+
+  private void awaitForPreviousSegmentFromIdealState(LLCSegmentName 
currSegment)
+      throws InterruptedException {
+    String previousSegment = getPreviousSegmentFromIdealState(currSegment);
+    if (previousSegment == null) {
+      // previous segment can only be null if either all the previous segments 
are deleted or this is the starting
+      // sequence segment of the partition Group.
+      return;
+    }
+
+    SegmentDataManager segmentDataManager = 
_realtimeTableDataManager.acquireSegment(previousSegment);
+    try {
+      long startTimeMs = System.currentTimeMillis();
+      _lock.lock();
+      try {
+        while (segmentDataManager == null) {
+          // if segmentDataManager == null, it means segment is not loaded in 
the server.
+          // wait until it's loaded.
+          if (!_condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
+            LOGGER.warn("Semaphore access denied to segment: {}. Waited on 
previous segment: {} for: {}ms.",
+                currSegment.getSegmentName(), previousSegment, 
System.currentTimeMillis() - startTimeMs);
+            // waited until timeout, fetch previous segment again from ideal 
state as previous segment might be
+            // changed in ideal state.
+            previousSegment = getPreviousSegmentFromIdealState(currSegment);
+            if (previousSegment == null) {
+              return;
+            }
+          }
+          segmentDataManager = 
_realtimeTableDataManager.acquireSegment(previousSegment);
+        }
+      } finally {
+        _lock.unlock();
+      }
+    } finally {
+      if (segmentDataManager != null) {
+        _realtimeTableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+  }
+
+  /***
+   * @param currSegment is the segment of current helix transition.
+   * @param timeoutMs is max time to wait in millis
+   * @return true if previous Segment was registered to the server, else false.
+   * @throws InterruptedException
+   */
+  @VisibleForTesting
+  boolean awaitForPreviousSegmentSequenceNumber(LLCSegmentName currSegment, 
long timeoutMs)
+      throws InterruptedException {
+    long startTimeMs = System.currentTimeMillis();
+    int prevSeqNum = currSegment.getSequenceNumber() - 1;
+    _lock.lock();
+    try {
+      while (_maxSegmentSeqNumRegistered < prevSeqNum) {
+        // it means the previous segment is not loaded in the server. Wait 
until it's loaded.
+        if (!_condition.await(timeoutMs, TimeUnit.MILLISECONDS)) {
+          LOGGER.warn(
+              "Semaphore access denied to segment: {}. Waited on previous 
segment with sequence number: {} for: {}ms.",
+              currSegment.getSegmentName(), prevSeqNum, 
System.currentTimeMillis() - startTimeMs);
+          // waited until the timeout. Rely on ideal state now.
+          return _maxSegmentSeqNumRegistered >= prevSeqNum;
+        }
+      }
+      return true;
+    } finally {
+      _lock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  @Nullable
+  String getPreviousSegmentFromIdealState(LLCSegmentName currSegment) {
+    long startTimeMs = System.currentTimeMillis();
+    // if seq num of current segment is 102, maxSequenceNumBelowCurrentSegment 
must be highest seq num of any segment
+    // created before current segment
+    int maxSequenceNumBelowCurrentSegment = -1;
+    String previousSegment = null;
+    int currPartitionGroupId = currSegment.getPartitionGroupId();
+    int currSequenceNum = currSegment.getSequenceNumber();
+    Map<String, Map<String, String>> segmentAssignment = 
getSegmentAssignment();
+    String currentServerInstanceId = 
_realtimeTableDataManager.getServerInstance();
+
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      String state = instanceStateMap.get(currentServerInstanceId);
+
+      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)) {
+        // if server is looking for previous segment to current transition's 
segment, it means the previous segment
+        // has to be online in the instance. If all previous segments are not 
online, we just allow the current helix
+        // transition to go ahead.
+        continue;
+      }
+
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName == null) {
+        // ignore uploaded segments
+        continue;
+      }
+
+      if (llcSegmentName.getPartitionGroupId() != currPartitionGroupId) {
+        // ignore segments of different partitions.
+        continue;
+      }
+
+      if (llcSegmentName.getSequenceNumber() >= currSequenceNum) {
+        // ignore segments with higher sequence number than existing helix 
transition segment.
+        continue;
+      }
+
+      if (llcSegmentName.getSequenceNumber() > 
maxSequenceNumBelowCurrentSegment) {
+        maxSequenceNumBelowCurrentSegment = llcSegmentName.getSequenceNumber();
+        // also track the name of segment
+        previousSegment = segmentName;
+      }
+    }
+
+    long timeSpentMs = System.currentTimeMillis() - startTimeMs;
+    LOGGER.info("Fetched previous segment: {} to current segment: {} in: 
{}ms.", previousSegment, currSegment,
+        timeSpentMs);
+    _serverMetrics.addTimedTableValue(_realtimeTableDataManager.getTableName(),
+        ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, timeSpentMs, 
TimeUnit.MILLISECONDS);
+
+    return previousSegment;
+  }
+
+  @VisibleForTesting
+  Map<String, Map<String, String>> getSegmentAssignment() {
+    IdealState idealState = 
HelixHelper.getTableIdealState(_realtimeTableDataManager.getHelixManager(),
+        _realtimeTableDataManager.getTableName());
+    Preconditions.checkState(idealState != null, "Failed to find ideal state 
for table: %s",
+        _realtimeTableDataManager.getTableName());
+    return idealState.getRecord().getMapFields();
+  }
+
+  @VisibleForTesting
+  Lock getLock() {
+    return _lock;
+  }
+
+  @VisibleForTesting
+  AtomicBoolean getFirstTransitionProcessed() {
+    return _firstTransitionProcessed;
+  }
+
+  // this should not be used outside of tests.
+  @VisibleForTesting
+  int getMaxSegmentSeqNumLoaded() {
+    return _maxSegmentSeqNumRegistered;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 229799e8d2..9011318891 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -247,7 +247,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   // Semaphore for each partitionGroupId only, which is to prevent two 
different stream consumers
   // from consuming with the same partitionGroupId in parallel in the same 
host.
   // See the comments in {@link RealtimeTableDataManager}.
-  private final Semaphore _partitionGroupConsumerSemaphore;
+  private final ConsumerCoordinator _consumerCoordinator;
   // A boolean flag to check whether the current thread has acquired the 
semaphore.
   // This boolean is needed because the semaphore is shared by threads; every 
thread holding this semaphore can
   // modify the permit. This boolean make sure the semaphore gets released 
only once when the partition group stops
@@ -1066,7 +1066,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
   @VisibleForTesting
   Semaphore getPartitionGroupConsumerSemaphore() {
-    return _partitionGroupConsumerSemaphore;
+    return _consumerCoordinator.getSemaphore();
   }
 
   @VisibleForTesting
@@ -1280,8 +1280,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     closePartitionGroupConsumer();
     closePartitionMetadataProvider();
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
-      _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore");
-      _partitionGroupConsumerSemaphore.release();
+      _segmentLogger.info("Releasing the consumer semaphore");
+      _consumerCoordinator.release();
     }
   }
 
@@ -1540,7 +1540,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded 
the segment and we don't reach here.
   public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig,
       RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore 
partitionGroupConsumerSemaphore,
+      Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator 
consumerCoordinator,
       ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
       @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isReadyToConsumeData)
       throws AttemptsExceededException, RetriableOperationException {
@@ -1585,7 +1585,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             _segmentZKMetadata.getEndOffset() == null ? null
                 : 
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getEndOffset()),
             _segmentZKMetadata.getStatus().toString());
-    _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
+    _consumerCoordinator = consumerCoordinator;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     InstanceDataManagerConfig instanceDataManagerConfig = 
indexLoadingConfig.getInstanceDataManagerConfig();
     String clientIdSuffix =
@@ -1679,11 +1679,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
     // Acquire semaphore to create stream consumers
     try {
-      long startTimeMs = System.currentTimeMillis();
-      while (!_partitionGroupConsumerSemaphore.tryAcquire(5, 
TimeUnit.MINUTES)) {
-        _segmentLogger.warn("Failed to acquire partitionGroupConsumerSemaphore 
in: {} ms. Retrying.",
-            System.currentTimeMillis() - startTimeMs);
-      }
+      _consumerCoordinator.acquire(llcSegmentName);
       _acquiredConsumerSemaphore.set(true);
     } catch (InterruptedException e) {
       String errorMsg = "InterruptedException when acquiring the 
partitionConsumerSemaphore";
@@ -1714,8 +1710,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via 
Helix Admin.
-      _segmentLogger.info("Releasing the _partitionGroupConsumerSemaphore");
-      _partitionGroupConsumerSemaphore.release();
+      _segmentLogger.info("Releasing the consumer semaphore");
+      _consumerCoordinator.release();
       _acquiredConsumerSemaphore.set(false);
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(),
           "Failed to initialize segment data manager", t));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index b070b743e0..7bffd4fdd8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -75,6 +75,8 @@ import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -106,8 +108,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   // consuming from the same stream partition can lead to bugs.
   // The semaphores will stay in the hash map even if the consuming partitions 
move to a different host.
   // We expect that there will be a small number of semaphores, but that may 
be ok.
-  private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new 
ConcurrentHashMap<>();
-
+  private final Map<Integer, ConsumerCoordinator> 
_partitionGroupIdToConsumerCoordinatorMap =
+      new ConcurrentHashMap<>();
   // The old name of the stats file used to be stats.ser which we changed when 
we moved all packages
   // from com.linkedin to org.apache because of not being able to deserialize 
the old files using the newer classes
   private static final String STATS_FILE_NAME = "segment-stats.ser";
@@ -142,6 +144,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private TableUpsertMetadataManager _tableUpsertMetadataManager;
   private BooleanSupplier _isTableReadyToConsumeData;
+  private boolean _enforceConsumptionInOrder = false;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
     this(segmentBuildSemaphore, () -> true);
@@ -223,6 +226,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       _tableUpsertMetadataManager.init(_tableConfig, schema, this);
     }
 
+    _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
+
     // For dedup and partial-upsert, need to wait for all segments loaded 
before starting consuming data
     if (isDedupEnabled() || isPartialUpsertEnabled()) {
       _isTableReadyToConsumeData = new BooleanSupplier() {
@@ -511,7 +516,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private void doAddConsumingSegment(String segmentName)
       throws AttemptsExceededException, RetriableOperationException {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
-    if (zkMetadata.getStatus() != Status.IN_PROGRESS) {
+    if ((zkMetadata.getStatus() != Status.IN_PROGRESS) && 
(!_enforceConsumptionInOrder)) {
       // NOTE: We do not throw exception here because the segment might have 
just been committed before the state
       //       transition is processed. We can skip adding this segment, and 
the segment will enter CONSUMING state in
       //       Helix, then we can rely on the following CONSUMING -> ONLINE 
state transition to add it.
@@ -543,7 +548,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Generates only one semaphore for every partition
     LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
     int partitionGroupId = llcSegmentName.getPartitionGroupId();
-    Semaphore semaphore = 
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new 
Semaphore(1));
+    ConsumerCoordinator consumerCoordinator = 
getConsumerCoordinator(partitionGroupId);
 
     // Create the segment data manager and register it
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
@@ -553,8 +558,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
             : null;
     RealtimeSegmentDataManager realtimeSegmentDataManager =
-        createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName, semaphore,
-            partitionUpsertMetadataManager, partitionDedupMetadataManager, 
_isTableReadyToConsumeData);
+        createRealtimeSegmentDataManager(zkMetadata, tableConfig, 
indexLoadingConfig, schema, llcSegmentName,
+            consumerCoordinator, partitionUpsertMetadataManager, 
partitionDedupMetadataManager,
+            _isTableReadyToConsumeData);
     registerSegment(segmentName, realtimeSegmentDataManager, 
partitionUpsertMetadataManager);
     if (partitionUpsertMetadataManager != null) {
       partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName);
@@ -641,12 +647,12 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @VisibleForTesting
   protected RealtimeSegmentDataManager 
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
       TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema 
schema, LLCSegmentName llcSegmentName,
-      Semaphore semaphore, PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
+      ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isTableReadyToConsumeData)
       throws AttemptsExceededException, RetriableOperationException {
     return new RealtimeSegmentDataManager(zkMetadata, tableConfig, this, 
_indexDir.getAbsolutePath(),
-        indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, 
partitionUpsertMetadataManager,
-        partitionDedupMetadataManager, isTableReadyToConsumeData);
+        indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, 
_serverMetrics,
+        partitionUpsertMetadataManager, partitionDedupMetadataManager, 
isTableReadyToConsumeData);
   }
 
   /**
@@ -803,6 +809,21 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     registerSegment(segmentName, segmentDataManager);
   }
 
+  @Override
+  protected SegmentDataManager registerSegment(String segmentName, 
SegmentDataManager segmentDataManager) {
+    SegmentDataManager oldSegmentDataManager = 
super.registerSegment(segmentName, segmentDataManager);
+    if (_enforceConsumptionInOrder) {
+      // helix threads might be waiting for their respective previous segments 
to be loaded.
+      // they need to be notified here.
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName != null) {
+        ConsumerCoordinator consumerCoordinator = 
getConsumerCoordinator(llcSegmentName.getPartitionGroupId());
+        consumerCoordinator.trackSegment(llcSegmentName);
+      }
+    }
+    return oldSegmentDataManager;
+  }
+
   /**
    * Replaces the CONSUMING segment with a downloaded committed one.
    */
@@ -852,6 +873,23 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return Collections.emptyMap();
   }
 
+  @Nullable
+  public StreamIngestionConfig getStreamIngestionConfig() {
+    IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
+    return ingestionConfig != null ? 
ingestionConfig.getStreamIngestionConfig() : null;
+  }
+
+  @VisibleForTesting
+  ConsumerCoordinator getConsumerCoordinator(int partitionGroupId) {
+    return 
_partitionGroupIdToConsumerCoordinatorMap.computeIfAbsent(partitionGroupId,
+        k -> new ConsumerCoordinator(_enforceConsumptionInOrder, this));
+  }
+
+  @VisibleForTesting
+  void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) {
+    _enforceConsumptionInOrder = enforceConsumptionInOrder;
+  }
+
   /**
    * Validate a schema against the table config for real-time record 
consumption.
    * Ideally, we should validate these things when schema is added or table is 
created, but either of these
@@ -886,4 +924,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // 2. Validate the schema itself
     SchemaUtils.validate(schema);
   }
+
+  private boolean isEnforceConsumptionInOrder() {
+    StreamIngestionConfig streamIngestionConfig = getStreamIngestionConfig();
+    return streamIngestionConfig != null && 
streamIngestionConfig.isEnforceConsumptionInOrder();
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
new file mode 100644
index 0000000000..a2b01a44ea
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ConsumerCoordinatorTest.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.cache.CacheBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.util.TestUtils;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class ConsumerCoordinatorTest {
+
+  private static class FakeRealtimeTableDataManager extends 
RealtimeTableDataManager {
+    private final StreamIngestionConfig _streamIngestionConfig;
+    private ConsumerCoordinator _consumerCoordinator;
+
+    public FakeRealtimeTableDataManager(Semaphore segmentBuildSemaphore,
+        boolean useIdealStateToCalculatePreviousSegment) {
+      super(segmentBuildSemaphore);
+      super._recentlyDeletedSegments = CacheBuilder.newBuilder().build();
+      StreamIngestionConfig streamIngestionConfig = new 
StreamIngestionConfig(List.of(new HashMap<>()));
+      streamIngestionConfig.setEnforceConsumptionInOrder(true);
+      if (useIdealStateToCalculatePreviousSegment) {
+        streamIngestionConfig.setUseIdealStateToCalculatePreviousSegment(true);
+      }
+      _streamIngestionConfig = streamIngestionConfig;
+    }
+
+    @Override
+    ConsumerCoordinator getConsumerCoordinator(int partitionId) {
+      return _consumerCoordinator;
+    }
+
+    public void setConsumerCoordinator(ConsumerCoordinator 
consumerCoordinator) {
+      _consumerCoordinator = consumerCoordinator;
+    }
+
+    @Override
+    public StreamIngestionConfig getStreamIngestionConfig() {
+      return _streamIngestionConfig;
+    }
+
+    @Override
+    public String getServerInstance() {
+      return "server_1";
+    }
+  }
+
+  private static class FakeConsumerCoordinator extends ConsumerCoordinator {
+    private final Map<String, Map<String, String>> _segmentAssignmentMap;
+
+    public FakeConsumerCoordinator(boolean enforceConsumptionInOrder,
+        RealtimeTableDataManager realtimeTableDataManager) {
+      super(enforceConsumptionInOrder, realtimeTableDataManager);
+      Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+        put("server_1", "ONLINE");
+        put("server_3", "ONLINE");
+      }};
+      _segmentAssignmentMap = new HashMap<>() {{
+        put("tableTest_REALTIME__1__101__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__2__101__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__2__100__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__1__1__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__1__14__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__1__91__20250304T0035Z", 
serverSegmentStatusMap);
+        put("tableTest_REALTIME__1__90__20250304T0035Z", 
serverSegmentStatusMap);
+      }};
+    }
+
+    @Override
+    public Map<String, Map<String, String>> getSegmentAssignment() {
+      return _segmentAssignmentMap;
+    }
+  }
+
+  @Test
+  public void testAwaitForPreviousSegmentSequenceNumber()
+      throws InterruptedException {
+    // 1. enable tracking segment seq num.
+    FakeRealtimeTableDataManager realtimeTableDataManager = new 
FakeRealtimeTableDataManager(null, false);
+    realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+    realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+
+    // 2. check if thread waits on prev segment seq
+    AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+    Thread thread1 = new Thread(() -> {
+      LLCSegmentName llcSegmentName = getLLCSegment(101);
+      try {
+        boolean b = 
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 5000);
+        atomicBoolean.set(b);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    thread1.start();
+
+    // 3. add prev segment and check if thread is unblocked.
+    consumerCoordinator.trackSegment(getLLCSegment(100));
+
+    TestUtils.waitForCondition(aVoid -> atomicBoolean.get(), 4000,
+        "Thread waiting on previous segment should have been unblocked.");
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+
+    // 4. check if second thread waits on prev segment seq until timeout and 
returns false
+    AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
+    Thread thread2 = new Thread(() -> {
+      LLCSegmentName llcSegmentName = getLLCSegment(102);
+      try {
+        boolean b = 
consumerCoordinator.awaitForPreviousSegmentSequenceNumber(llcSegmentName, 500);
+        atomicBoolean2.set(b);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    thread2.start();
+
+    Thread.sleep(1500);
+
+    Assert.assertFalse(atomicBoolean2.get());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 100);
+  }
+
+  @Test
+  public void testFirstConsumer()
+      throws InterruptedException {
+    // 1. Enable tracking segment seq num.
+    FakeRealtimeTableDataManager realtimeTableDataManager = new 
FakeRealtimeTableDataManager(null, false);
+    realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+    realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+    ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+    RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = 
getMockedRealtimeSegmentDataManager();
+    Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+      put("server_1", "ONLINE");
+      put("server_3", "ONLINE");
+    }};
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(102), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(104), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(106), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(107), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(109), 
serverSegmentStatusMap);
+
+    // 2. create multiple helix transitions in this order: 106, 109, 104, 107
+    Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(106));
+    Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(109));
+    Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(104));
+    Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(107));
+
+    thread1.start();
+
+    Thread.sleep(1000);
+
+    // 3. load segment 100, 101, 102
+    realtimeTableDataManager.registerSegment(getSegmentName(100), 
mockedRealtimeSegmentDataManager);
+    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
+    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+    Thread.sleep(1000);
+
+    // 4. check all of the above threads wait
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    thread2.start();
+    thread3.start();
+    thread4.start();
+
+    Thread.sleep(1000);
+
+    // 5. check that first thread acquiring semaphore is of segment 104
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    realtimeTableDataManager.registerSegment(getSegmentName(104), 
mockedRealtimeSegmentDataManager);
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 104);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
+
+    // 6. check the next threads acquiring semaphore is 106
+    consumerCoordinator.getSemaphore().release();
+    realtimeTableDataManager.registerSegment(getSegmentName(106), 
mockedRealtimeSegmentDataManager);
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 106);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
+  }
+
+  @Test
+  public void testSequentialOrderNotRelyingOnIdealState()
+      throws InterruptedException {
+    // 1. Enable tracking segment seq num.
+    FakeRealtimeTableDataManager realtimeTableDataManager = new 
FakeRealtimeTableDataManager(null, false);
+    realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+    realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+    ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+
+    // 2. check first transition blocked on ideal state
+    Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(101));
+    thread1.start();
+
+    Thread.sleep(2000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+    
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = 
getMockedRealtimeSegmentDataManager();
+
+    // 3. register older segment and check seq num watermark and semaphore.
+    realtimeTableDataManager.registerSegment(getSegmentName(90), 
mockedRealtimeSegmentDataManager);
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 90);
+    
Assert.assertFalse(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    // 4. register prev segment and check watermark and if thread was unblocked
+    realtimeTableDataManager.registerSegment(getSegmentName(91), 
mockedRealtimeSegmentDataManager);
+    Thread.sleep(1000);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    // 5. check that all the following transitions rely on seq num watermark 
and gets blocked.
+    Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102));
+    Thread thread3 = getNewThread(consumerCoordinator, getLLCSegment(103));
+    Thread thread4 = getNewThread(consumerCoordinator, getLLCSegment(104));
+    thread3.start();
+    thread2.start();
+    thread4.start();
+
+    Thread.sleep(2000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    // 6. check that all above threads are still blocked even if semaphore is 
released.
+    consumerCoordinator.getSemaphore().release();
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 91);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    // 6. mark 101 seg as complete. Check 102 acquired the semaphore.
+    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
+
+    Thread.sleep(1000);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 101);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
+
+    // 7. register 102 seg, check if seg 103 is waiting on semaphore.
+    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+
+    Thread.sleep(1000);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
+
+    // 8. release the semaphore and check if semaphore is acquired by seg 103.
+    consumerCoordinator.getSemaphore().release();
+    Thread.sleep(1000);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 102);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
+
+    // 8. register 103 seg and check if seg 104 is now queued on semaphore
+    realtimeTableDataManager.registerSegment(getSegmentName(103), 
mockedRealtimeSegmentDataManager);
+    Thread.sleep(1000);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), 103);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
+  }
+
+  @Test
+  public void testSequentialOrderRelyingOnIdealState()
+      throws InterruptedException {
+    FakeRealtimeTableDataManager realtimeTableDataManager = new 
FakeRealtimeTableDataManager(null, true);
+    realtimeTableDataManager.setEnforceConsumptionInOrder(true);
+
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+    realtimeTableDataManager.setConsumerCoordinator(consumerCoordinator);
+
+    // 1. test that acquire blocks when prev segment is not loaded.
+    Thread thread = getNewThread(consumerCoordinator, getLLCSegment(101));
+    thread.start();
+
+    ReentrantLock lock = (ReentrantLock) consumerCoordinator.getLock();
+
+    RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = 
Mockito.mock(RealtimeSegmentDataManager.class);
+    
Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true);
+    Assert.assertNotNull(realtimeTableDataManager);
+
+    // prev segment has seq 91, so registering seq 90 won't do anything.
+    realtimeTableDataManager.registerSegment(getSegmentName(90), 
mockedRealtimeSegmentDataManager);
+
+    Thread.sleep(2000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+
+    // 2. test that registering prev segment will unblock thread.
+    realtimeTableDataManager.registerSegment(getSegmentName(91), 
mockedRealtimeSegmentDataManager);
+
+    TestUtils.waitForCondition(aVoid -> 
(consumerCoordinator.getSemaphore().availablePermits() == 0), 5000,
+        "Semaphore must be acquired after registering previous segment");
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertTrue(consumerCoordinator.getFirstTransitionProcessed().get());
+
+    consumerCoordinator.release();
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+    realtimeTableDataManager.registerSegment(getSegmentName(101), 
mockedRealtimeSegmentDataManager);
+
+    // 3. test that segment 103 will be blocked.
+    Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+      put("server_1", "ONLINE");
+      put("server_3", "ONLINE");
+    }};
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(102), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(103), 
serverSegmentStatusMap);
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(104), 
serverSegmentStatusMap);
+
+    Thread thread1 = getNewThread(consumerCoordinator, getLLCSegment(103));
+    thread1.start();
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+
+    // 3. test that segment 102 will acquire semaphore.
+    Thread thread2 = getNewThread(consumerCoordinator, getLLCSegment(102));
+    thread2.start();
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+
+    // 4. registering seg 102 should unblock seg 103
+    realtimeTableDataManager.registerSegment(getSegmentName(102), 
mockedRealtimeSegmentDataManager);
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
1);
+
+    // 5. releasing semaphore should let seg 103 acquire it
+    consumerCoordinator.getSemaphore().release();
+
+    Thread.sleep(1000);
+
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().getQueueLength(), 
0);
+    Assert.assertFalse(lock.hasQueuedThreads() && lock.isLocked());
+    Assert.assertEquals(consumerCoordinator.getMaxSegmentSeqNumLoaded(), -1);
+  }
+
+  @Test
+  public void testRandomOrder()
+      throws InterruptedException {
+    RealtimeTableDataManager realtimeTableDataManager = 
Mockito.mock(RealtimeTableDataManager.class);
+    
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
+
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(false, realtimeTableDataManager);
+
+    String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    Assert.assertNotNull(llcSegmentName);
+    consumerCoordinator.acquire(llcSegmentName);
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
0);
+    Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+
+    consumerCoordinator.release();
+    Assert.assertEquals(consumerCoordinator.getSemaphore().availablePermits(), 
1);
+    Assert.assertFalse(consumerCoordinator.getSemaphore().hasQueuedThreads());
+  }
+
+  @Test
+  public void testPreviousSegment() {
+    RealtimeTableDataManager realtimeTableDataManager = 
Mockito.mock(RealtimeTableDataManager.class);
+    
Mockito.when(realtimeTableDataManager.getTableName()).thenReturn("tableTest_REALTIME");
+    
Mockito.when(realtimeTableDataManager.getServerInstance()).thenReturn("server_1");
+
+    FakeConsumerCoordinator consumerCoordinator = new 
FakeConsumerCoordinator(true, realtimeTableDataManager);
+
+    String segmentName = "tableTest_REALTIME__1__101__20250304T0035Z";
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    Assert.assertNotNull(llcSegmentName);
+    String previousSegment = 
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
+    Assert.assertEquals(previousSegment, 
"tableTest_REALTIME__1__91__20250304T0035Z");
+
+    consumerCoordinator.getSegmentAssignment().clear();
+    Map<String, String> serverSegmentStatusMap = new HashMap<>() {{
+      put("server_3", "ONLINE");
+    }};
+    consumerCoordinator.getSegmentAssignment().put(getSegmentName(100), 
serverSegmentStatusMap);
+    previousSegment = 
consumerCoordinator.getPreviousSegmentFromIdealState(llcSegmentName);
+    Assert.assertNull(previousSegment);
+  }
+
+  private Thread getNewThread(FakeConsumerCoordinator consumerCoordinator, 
LLCSegmentName llcSegmentName) {
+    return new Thread(() -> {
+      try {
+        consumerCoordinator.acquire(llcSegmentName);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }, String.valueOf(llcSegmentName.getSequenceNumber()));
+  }
+
+  private RealtimeSegmentDataManager getMockedRealtimeSegmentDataManager() {
+    RealtimeSegmentDataManager mockedRealtimeSegmentDataManager = 
Mockito.mock(RealtimeSegmentDataManager.class);
+    
Mockito.when(mockedRealtimeSegmentDataManager.increaseReferenceCount()).thenReturn(true);
+    Assert.assertNotNull(mockedRealtimeSegmentDataManager);
+    return mockedRealtimeSegmentDataManager;
+  }
+
+  private LLCSegmentName getLLCSegment(int seqNum) {
+    String segmentName = getSegmentName(seqNum);
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    Assert.assertNotNull(llcSegmentName);
+    return llcSegmentName;
+  }
+
+  private String getSegmentName(int seqNum) {
+    return "tableTest_REALTIME__1__" + seqNum + "__20250304T0035Z";
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 623d0de37d..72dbf26acb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -97,7 +97,8 @@ public class RealtimeSegmentDataManagerTest {
   private static final long START_OFFSET_VALUE = 198L;
   private static final LongMsgOffset START_OFFSET = new 
LongMsgOffset(START_OFFSET_VALUE);
 
-  private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new 
ConcurrentHashMap<>();
+  private final Map<Integer, ConsumerCoordinator> 
_partitionGroupIdToConsumerCoordinatorMap =
+      new ConcurrentHashMap<>();
 
   private static TableConfig createTableConfig()
       throws Exception {
@@ -166,12 +167,13 @@ public class RealtimeSegmentDataManagerTest {
     
tableConfig.getIngestionConfig().setRetryOnSegmentBuildPrecheckFailure(true);
     RealtimeTableDataManager tableDataManager = 
createTableDataManager(tableConfig);
     LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
-    _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new 
Semaphore(1));
+    _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID,
+        new ConsumerCoordinator(false, tableDataManager));
     Schema schema = Fixtures.createSchema();
     ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     return new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
tableDataManager,
         new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, 
llcSegmentName,
-        _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
+        _partitionGroupIdToConsumerCoordinatorMap, serverMetrics, 
timeSupplier);
   }
 
   @BeforeClass
@@ -992,7 +994,7 @@ public class RealtimeSegmentDataManagerTest {
     private boolean _notifySegmentBuildFailedWithDeterministicErrorCalled = 
false;
     public boolean _throwExceptionFromConsume = false;
     public boolean _postConsumeStoppedCalled = false;
-    public Map<Integer, Semaphore> _semaphoreMap;
+    public Map<Integer, ConsumerCoordinator> _consumerCoordinatorMap;
     public boolean _stubConsumeLoop = true;
     private TimeSupplier _timeSupplier;
     private boolean _indexCapacityThresholdBreached;
@@ -1009,12 +1011,13 @@ public class RealtimeSegmentDataManagerTest {
 
     public FakeRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig,
         RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, Schema schema,
-        LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, 
ServerMetrics serverMetrics,
-        TimeSupplier timeSupplier)
+        LLCSegmentName llcSegmentName, Map<Integer, ConsumerCoordinator> 
consumerCoordinatorMap,
+        ServerMetrics serverMetrics, TimeSupplier timeSupplier)
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), 
tableConfig), schema, llcSegmentName,
-          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null, null, () -> true);
+          consumerCoordinatorMap.get(llcSegmentName.getPartitionGroupId()), 
serverMetrics, null, null,
+          () -> true);
       _state = RealtimeSegmentDataManager.class.getDeclaredField("_state");
       _state.setAccessible(true);
       _shouldStop = 
RealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
@@ -1024,7 +1027,7 @@ public class RealtimeSegmentDataManagerTest {
       _segmentBuildFailedWithDeterministicError =
           
RealtimeSegmentDataManager.class.getDeclaredField("_segmentBuildFailedWithDeterministicError");
       _segmentBuildFailedWithDeterministicError.setAccessible(true);
-      _semaphoreMap = semaphoreMap;
+      _consumerCoordinatorMap = consumerCoordinatorMap;
       _streamMsgOffsetFactory = 
RealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
       _streamMsgOffsetFactory.setAccessible(true);
       _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
index 960138e8a0..8899fbd723 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.integration.tests.realtime.utils;
 
-import java.util.concurrent.Semaphore;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -43,16 +43,15 @@ public class FailureInjectingRealtimeSegmentDataManager 
extends RealtimeSegmentD
   /**
    * Creates a manager that will forcibly fail the commit segment step.
    */
-  public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata,
-      TableConfig tableConfig, RealtimeTableDataManager 
realtimeTableDataManager, String resourceDataDir,
-      IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName 
llcSegmentName,
-      Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics,
-      boolean failCommit) throws AttemptsExceededException, 
RetriableOperationException {
+  public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableConfig tableConfig,
+      RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
+      Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator 
consumerCoordinator,
+      ServerMetrics serverMetrics, boolean failCommit)
+      throws AttemptsExceededException, RetriableOperationException {
     // Pass through to the real parent constructor
-    super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir,
-        indexLoadingConfig, schema, llcSegmentName, 
partitionGroupConsumerSemaphore, serverMetrics,
-        null /* no PartitionUpsertMetadataManager */, null /* no 
PartitionDedupMetadataManager */,
-        () -> true /* isReadyToConsumeData always true for tests */);
+    super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir, indexLoadingConfig, schema,
+        llcSegmentName, consumerCoordinator, serverMetrics, null /* no 
PartitionUpsertMetadataManager */,
+        null /* no PartitionDedupMetadataManager */, () -> true /* 
isReadyToConsumeData always true for tests */);
 
     _failCommit = failCommit;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 278e2bcb58..af65cd1a79 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.PauselessConsumptionUtils;
+import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -52,7 +53,7 @@ public class FailureInjectingRealtimeTableDataManager extends 
RealtimeTableDataM
   @Override
   protected RealtimeSegmentDataManager 
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
       TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema 
schema, LLCSegmentName llcSegmentName,
-      Semaphore semaphore, PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
+      ConsumerCoordinator consumerCoordinator, PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, 
BooleanSupplier isTableReadyToConsumeData)
       throws AttemptsExceededException, RetriableOperationException {
 
@@ -61,6 +62,6 @@ public class FailureInjectingRealtimeTableDataManager extends 
RealtimeTableDataM
       addFailureToCommits = false;
     }
     return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
-        indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, 
addFailureToCommits);
+        indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, 
_serverMetrics, addFailureToCommits);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index e1400620fb..da17a2c020 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -39,10 +39,17 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   private boolean _columnMajorSegmentBuilderEnabled = true;
 
   @JsonPropertyDescription("Whether to track offsets of the filtered stream 
messages during consumption.")
-  private boolean _trackFilteredMessageOffsets = false;
+  private boolean _trackFilteredMessageOffsets;
 
   @JsonPropertyDescription("Whether pauseless consumption is enabled for the 
table")
-  private boolean _pauselessConsumptionEnabled = false;
+  private boolean _pauselessConsumptionEnabled;
+
+  @JsonPropertyDescription("Enforce consumption of segments in order of 
segment creation by the controller")
+  private boolean _enforceConsumptionInOrder;
+
+  @JsonPropertyDescription("If enabled, Server always relies on ideal state to 
get previous segment. If disabled, "
+      + "server uses sequence id - 1 for previous segment")
+  private boolean _useIdealStateToCalculatePreviousSegment;
 
   @JsonPropertyDescription("Policy to determine the behaviour of parallel 
consumption.")
   private ParallelSegmentConsumptionPolicy _parallelSegmentConsumptionPolicy;
@@ -80,6 +87,22 @@ public class StreamIngestionConfig extends BaseJsonConfig {
     _pauselessConsumptionEnabled = pauselessConsumptionEnabled;
   }
 
+  public boolean isEnforceConsumptionInOrder() {
+    return _enforceConsumptionInOrder;
+  }
+
+  public void setEnforceConsumptionInOrder(boolean enforceConsumptionInOrder) {
+    _enforceConsumptionInOrder = enforceConsumptionInOrder;
+  }
+
+  public boolean isUseIdealStateToCalculatePreviousSegment() {
+    return _useIdealStateToCalculatePreviousSegment;
+  }
+
+  public void setUseIdealStateToCalculatePreviousSegment(boolean 
useIdealStateToCalculatePreviousSegment) {
+    _useIdealStateToCalculatePreviousSegment = 
useIdealStateToCalculatePreviousSegment;
+  }
+
   @Nullable
   public ParallelSegmentConsumptionPolicy 
getParallelSegmentConsumptionPolicy() {
     return _parallelSegmentConsumptionPolicy;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to