This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a23c610309b Fixes all race conditions from stream consumer (#17089)
a23c610309b is described below

commit a23c610309b506afe8342ec257cfaea0dfd7e87b
Author: NOOB <[email protected]>
AuthorDate: Mon Nov 17 17:16:57 2025 +0530

    Fixes all race conditions from stream consumer (#17089)
    
    * Fixes all race condition from stream consumer
    
    * Adds test for tableDataManager cache
    
    * Fixes bug related to fetching partition offset
    
    * Fixes test and addresses comments
    
    * Reduces code duplication
---
 .../realtime/RealtimeSegmentDataManager.java       |  68 ++++-----
 .../realtime/RealtimeSegmentMetadataUtils.java     |  65 +++++++++
 .../manager/realtime/RealtimeTableDataManager.java |  43 ++++++
 .../realtime/RealtimeTableDataManagerTest.java     |  66 +++++++++
 .../stream/kafka20/KafkaConsumerFactory.java       |   9 ++
 .../SynchronizedKafkaStreamMetadataProvider.java   |  44 ++++++
 .../stream/kafka30/KafkaConsumerFactory.java       |   9 ++
 .../SynchronizedKafkaStreamMetadataProvider.java   |  44 ++++++
 .../pinot/server/api/resources/DebugResource.java  |  26 +++-
 .../pinot/server/api/resources/TablesResource.java |  24 ++--
 .../FreshnessBasedConsumptionStatusChecker.java    |  26 ++--
 .../IngestionBasedConsumptionStatusChecker.java    |   6 +-
 .../helix/OffsetBasedConsumptionStatusChecker.java |   4 +-
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 156 ++++++++++++++++-----
 .../OffsetBasedConsumptionStatusCheckerTest.java   |  18 +--
 .../pinot/spi/stream/StreamConsumerFactory.java    |   4 +
 .../pinot/spi/stream/StreamMetadataProvider.java   |   3 +
 17 files changed, 501 insertions(+), 114 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e372be01e0c..b8e2ce9097e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -94,7 +94,6 @@ import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -1082,21 +1081,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   /**
    * Returns the {@link ConsumerPartitionState} for the partition group.
    */
-  public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+  public Map<String, ConsumerPartitionState> getConsumerPartitionState(
+      @Nullable StreamPartitionMsgOffset latestMsgOffset) {
     String partitionGroupId = String.valueOf(_partitionGroupId);
-    return Collections.singletonMap(partitionGroupId, new 
ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
-        getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000), 
_lastRowMetadata));
-  }
-
-  /**
-   * Returns the {@link PartitionLagState} for the partition group.
-   */
-  public Map<String, PartitionLagState> getPartitionToLagState(
-      Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
-    if (_partitionMetadataProvider == null) {
-      createPartitionMetadataProvider("Get Partition Lag State");
-    }
-    return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
+    return Collections.singletonMap(partitionGroupId,
+        new ConsumerPartitionState(partitionGroupId, getCurrentOffset(), 
getLastConsumedTimestamp(), latestMsgOffset,
+            _lastRowMetadata));
   }
 
   /**
@@ -1119,6 +1109,18 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _currentOffset;
   }
 
+  public String getTableStreamName() {
+    return _tableStreamName;
+  }
+
+  public int getStreamPartitionId() {
+    return _streamPartitionId;
+  }
+
+  public StreamConsumerFactory getStreamConsumerFactory() {
+    return _streamConsumerFactory;
+  }
+
   @Nullable
   public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
     return _latestStreamOffsetAtStartupTime;
@@ -1436,6 +1438,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     } while (!_shouldStop);
   }
 
+  public StreamMetadataProvider getPartitionMetadataProvider() {
+    return _partitionMetadataProvider;
+  }
+
   protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
     // Post segmentConsumed to current leader.
     // Retry maybe once if leader is not found.
@@ -1905,41 +1911,21 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   }
 
   @Nullable
-  public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs, 
boolean useDebugLog) {
-    return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 
maxWaitTimeMs, useDebugLog);
-  }
-
-  @Nullable
-  public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
-    return fetchLatestStreamOffset(maxWaitTimeMs, false);
-  }
-
-  @Nullable
-  public StreamPartitionMsgOffset fetchEarliestStreamOffset(long 
maxWaitTimeMs, boolean useDebugLog) {
-    return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 
maxWaitTimeMs, useDebugLog);
+  private StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) 
{
+    return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 
maxWaitTimeMs);
   }
 
   @Nullable
-  public StreamPartitionMsgOffset fetchEarliestStreamOffset(long 
maxWaitTimeMs) {
-    return fetchEarliestStreamOffset(maxWaitTimeMs, false);
-  }
-
-  @Nullable
-  private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria 
offsetCriteria, long maxWaitTimeMs,
-      boolean useDebugLog) {
+  private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria 
offsetCriteria, long maxWaitTimeMs) {
     if (_partitionMetadataProvider == null) {
       createPartitionMetadataProvider("Fetch latest stream offset");
     }
     try {
       return 
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, 
maxWaitTimeMs);
     } catch (Exception e) {
-      String logMessage = "Cannot fetch stream offset with criteria " + 
offsetCriteria + " for clientId " + _clientId
-          + " and partitionGroupId " + _partitionGroupId + " with maxWaitTime 
" + maxWaitTimeMs;
-      if (!useDebugLog) {
-        _segmentLogger.warn(logMessage, e);
-      } else {
-        _segmentLogger.debug(logMessage, e);
-      }
+      _segmentLogger.warn(
+          "Cannot fetch stream offset with criteria {} for clientId {} and 
partitionGroupId {} with maxWaitTime {}",
+          offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs, e);
     }
     return null;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
new file mode 100644
index 00000000000..faaf183c17e
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentMetadataUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for realtime segment metadata operations.
+ */
+public class RealtimeSegmentMetadataUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentMetadataUtils.class);
+  private static final long STREAM_METADATA_FETCH_TIMEOUT_MS = 5000;
+
+  private RealtimeSegmentMetadataUtils() {
+  }
+
+  /**
+   * Fetches the latest stream offset for a segment's partition using the 
provided metadata provider.
+   * This encapsulates the common pattern of getting the partition ID from the 
segment and fetching
+   * the offset from the metadata provider.
+   *
+   * @param realtimeSegmentDataManager The segment data manager to get 
partition ID from
+   * @param streamMetadataProvider The stream metadata provider to use for 
fetching
+   * @return The latest stream offset for the partition, or null if not 
available
+   * @throws RuntimeException if fetching fails
+   */
+  @Nullable
+  public static StreamPartitionMsgOffset 
fetchLatestStreamOffset(RealtimeSegmentDataManager realtimeSegmentDataManager,
+      StreamMetadataProvider streamMetadataProvider) {
+    try {
+      int partitionId = realtimeSegmentDataManager.getStreamPartitionId();
+      Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+          
streamMetadataProvider.fetchLatestStreamOffset(Collections.singleton(partitionId),
+              STREAM_METADATA_FETCH_TIMEOUT_MS);
+      return partitionMsgOffsetMap.get(partitionId);
+    } catch (Exception e) {
+      String segmentName = realtimeSegmentDataManager.getSegmentName();
+      LOGGER.error("Failed to fetch latest stream offset for segment: {}", 
segmentName, e);
+      throw new RuntimeException("Failed to fetch latest stream offset for 
segment: " + segmentName, e);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 3a379c4f703..d6c8407abaf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -20,9 +20,13 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -31,6 +35,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -82,7 +87,9 @@ import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -133,6 +140,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   public static final long SLEEP_INTERVAL_MS = 30000; // 30 seconds sleep 
interval
   @Deprecated
   private static final String SEGMENT_DOWNLOAD_TIMEOUT_MINUTES = 
"segmentDownloadTimeoutMinutes";
+  private static final Duration STREAM_METADATA_PROVIDER_CACHE_TTL = 
Duration.ofMinutes(10);
+
+  protected Cache<String, StreamMetadataProvider> _streamMetadataProviderCache;
 
   private final BooleanSupplier _isServerReadyToServeQueries;
 
@@ -216,6 +226,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
 
     _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
+    _streamMetadataProviderCache = getStreamMetadataProviderCache();
 
     // For dedup and partial-upsert, need to wait for all segments loaded 
before starting consuming data
     if (isDedupEnabled() || isPartialUpsertEnabled()) {
@@ -248,6 +259,23 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
+  @VisibleForTesting
+  protected Cache<String, StreamMetadataProvider> 
getStreamMetadataProviderCache() {
+    return CacheBuilder.newBuilder()
+        .expireAfterAccess(STREAM_METADATA_PROVIDER_CACHE_TTL)
+        .removalListener((RemovalNotification<String, StreamMetadataProvider> 
notification) -> {
+          StreamMetadataProvider provider = notification.getValue();
+          if (provider != null) {
+            try {
+              provider.close();
+            } catch (Exception e) {
+              LOGGER.warn("Failed to close StreamMetadataProvider for key {}", 
notification.getKey(), e);
+            }
+          }
+        })
+        .build();
+  }
+
   @Override
   protected void doStart() {
   }
@@ -355,6 +383,21 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return segmentContexts;
   }
 
+  /**
+   *  Returns thread safe StreamMetadataProvider which is shared across 
different callers.
+   */
+  public StreamMetadataProvider 
getStreamMetadataProvider(RealtimeSegmentDataManager 
realtimeSegmentDataManager) {
+    String tableStreamName = realtimeSegmentDataManager.getTableStreamName();
+    StreamConsumerFactory streamConsumerFactory = 
realtimeSegmentDataManager.getStreamConsumerFactory();
+    try {
+      return _streamMetadataProviderCache.get(tableStreamName,
+          () -> 
streamConsumerFactory.createStreamMetadataProvider(tableStreamName, true));
+    } catch (ExecutionException e) {
+      LOGGER.error("Failed to get stream metadata provider for tableStream: 
{}", tableStreamName);
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Returns all partitionGroupIds for the partitions hosted by this server 
for current table.
    * @apiNote this involves Zookeeper read and should not be used frequently 
due to efficiency concerns.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
index ea266df007a..1c78091d16f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManagerTest.java
@@ -18,15 +18,27 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
+import java.time.Duration;
+import java.util.concurrent.Semaphore;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -60,4 +72,58 @@ public class RealtimeTableDataManagerTest {
     assertEquals(timeFieldSpec.getDefaultNullValue(),
         
Integer.parseInt(DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(currentTimeMs)));
   }
+
+  @Test
+  public void testStreamMetadataProviderCache() {
+    class FakeRealtimeTableDataManager extends RealtimeTableDataManager {
+      private boolean _cacheEntryRemovalNotified;
+      public FakeRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+        super(segmentBuildSemaphore);
+        _streamMetadataProviderCache = getStreamMetadataProviderCache();
+      }
+      public void updateCache() {
+        _streamMetadataProviderCache = CacheBuilder.newBuilder()
+            .expireAfterAccess(Duration.ofMillis(1))
+            .removalListener((RemovalNotification<String, 
StreamMetadataProvider> notification) -> {
+              StreamMetadataProvider provider = notification.getValue();
+              if (provider != null) {
+                try {
+                  provider.close();
+                  _cacheEntryRemovalNotified = true;
+                } catch (Exception e) {
+                  LOGGER.warn("Failed to close StreamMetadataProvider for key 
{}", notification.getKey(), e);
+                }
+              }
+            })
+            .build();
+      }
+      public Cache<String, StreamMetadataProvider> getCache() {
+        return _streamMetadataProviderCache;
+      }
+    }
+    FakeRealtimeTableDataManager fakeRealtimeTableDataManager = new 
FakeRealtimeTableDataManager(null);
+
+    RealtimeSegmentDataManager mockRealtimeSegmentDataManager = 
Mockito.mock(RealtimeSegmentDataManager.class);
+    
when(mockRealtimeSegmentDataManager.getTableStreamName()).thenReturn("testTable-testTopic");
+    StreamConfig streamConfig = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+    when(mockRealtimeSegmentDataManager.getStreamConsumerFactory()).thenReturn(
+        StreamConsumerFactoryProvider.create(streamConfig));
+
+    StreamMetadataProvider streamMetadataProvider =
+        
fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager);
+    
Assert.assertEquals(fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager),
+        streamMetadataProvider);
+
+    fakeRealtimeTableDataManager.updateCache();
+    Assert.assertEquals(fakeRealtimeTableDataManager.getCache().size(), 0);
+
+    StreamMetadataProvider streamMetadataProvider1 =
+        
fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager);
+    Assert.assertNotEquals(streamMetadataProvider, streamMetadataProvider1);
+
+    TestUtils.waitForCondition(
+        aVoid -> 
!(fakeRealtimeTableDataManager.getStreamMetadataProvider(mockRealtimeSegmentDataManager)
+            .equals(streamMetadataProvider1)), 5, 2000, 
"streamMetadataProvider returned from cache must be new.");
+    Assert.assertTrue(fakeRealtimeTableDataManager._cacheEntryRemovalNotified);
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 17d46c041be..849103c0b17 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -45,6 +45,15 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
 
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId, 
boolean concurrentAccessExpected) {
+    if (concurrentAccessExpected) {
+      return new SynchronizedKafkaStreamMetadataProvider(clientId, 
_streamConfig);
+    } else {
+      return createStreamMetadataProvider(clientId);
+    }
+  }
+
   @Override
   public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..5549dd734fc
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that 
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link 
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends 
KafkaStreamMetadataProvider {
+
+  public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
+    super(clientId, streamConfig);
+  }
+
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    synchronized (this) {
+      return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
index d9ee7067dbc..8847efc68b5 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaConsumerFactory.java
@@ -45,6 +45,15 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
 
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId, 
boolean concurrentAccessExpected) {
+    if (concurrentAccessExpected) {
+      return new SynchronizedKafkaStreamMetadataProvider(clientId, 
_streamConfig);
+    } else {
+      return createStreamMetadataProvider(clientId);
+    }
+  }
+
   @Override
   public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
new file mode 100644
index 00000000000..ea7f5897537
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/SynchronizedKafkaStreamMetadataProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka30;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * A thread-safe variant of {@link KafkaStreamMetadataProvider} that 
synchronizes
+ * access to metadata fetch operations.
+ * This is particularly useful when a shared instance of {@link 
KafkaStreamMetadataProvider}
+ * is accessed concurrently by multiple threads.
+ */
+public class SynchronizedKafkaStreamMetadataProvider extends 
KafkaStreamMetadataProvider {
+
+  public SynchronizedKafkaStreamMetadataProvider(String clientId, StreamConfig 
streamConfig) {
+    super(clientId, streamConfig);
+  }
+
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    synchronized (this) {
+      return super.fetchLatestStreamOffset(partitionIds, timeoutMillis);
+    }
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index dffe1cb347b..9ea7529a4ad 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -54,6 +54,8 @@ import 
org.apache.pinot.common.restlet.resources.SegmentServerDebugInfo;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -64,6 +66,9 @@ import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,7 +129,7 @@ public class DebugResource {
     Map<String, SegmentErrorInfo> segmentErrorsMap = 
tableDataManager.getSegmentErrors();
     SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
     try {
-      SegmentConsumerInfo segmentConsumerInfo = 
getSegmentConsumerInfo(segmentDataManager, tableType);
+      SegmentConsumerInfo segmentConsumerInfo = 
getSegmentConsumerInfo(tableDataManager, segmentDataManager, tableType);
       long segmentSize = getSegmentSize(segmentDataManager);
       SegmentErrorInfo segmentErrorInfo = segmentErrorsMap.get(segmentName);
       return new SegmentServerDebugInfo(segmentName, 
FileUtils.byteCountToDisplaySize(segmentSize), segmentConsumerInfo,
@@ -172,7 +177,8 @@ public class DebugResource {
         segmentsWithDataManagers.add(segmentName);
 
         // Get segment consumer info.
-        SegmentConsumerInfo segmentConsumerInfo = 
getSegmentConsumerInfo(segmentDataManager, tableType);
+        SegmentConsumerInfo segmentConsumerInfo =
+            getSegmentConsumerInfo(tableDataManager, segmentDataManager, 
tableType);
 
         // Get segment size.
         long segmentSize = getSegmentSize(segmentDataManager);
@@ -209,17 +215,25 @@ public class DebugResource {
         .getSegment()).getSegmentSizeBytes() : 0;
   }
 
-  private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager 
segmentDataManager, TableType tableType) {
+  private SegmentConsumerInfo getSegmentConsumerInfo(TableDataManager 
tableDataManager,
+      SegmentDataManager segmentDataManager, TableType tableType) {
     SegmentConsumerInfo segmentConsumerInfo = null;
     if (tableType == TableType.REALTIME) {
       RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-      Map<String, ConsumerPartitionState> partitionStateMap = 
realtimeSegmentDataManager.getConsumerPartitionState();
+      StreamMetadataProvider streamMetadataProvider =
+          ((RealtimeTableDataManager) 
(tableDataManager)).getStreamMetadataProvider(realtimeSegmentDataManager);
+      StreamPartitionMsgOffset latestMsgOffset =
+          
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(realtimeSegmentDataManager,
 streamMetadataProvider);
+      Map<String, ConsumerPartitionState> partitionIdToStateMap =
+          
realtimeSegmentDataManager.getConsumerPartitionState(latestMsgOffset);
       Map<String, String> currentOffsets = 
realtimeSegmentDataManager.getPartitionToCurrentOffset();
-      Map<String, String> upstreamLatest = 
partitionStateMap.entrySet().stream().collect(
+      Map<String, String> upstreamLatest = 
partitionIdToStateMap.entrySet().stream().collect(
           Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getUpstreamLatestOffset().toString()));
       Map<String, String> recordsLagMap = new HashMap<>();
       Map<String, String> availabilityLagMsMap = new HashMap<>();
-      
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k,
 v) -> {
+      Map<String, PartitionLagState> partitionToLagState =
+          
streamMetadataProvider.getCurrentPartitionLagState(partitionIdToStateMap);
+      partitionToLagState.forEach((k, v) -> {
         recordsLagMap.put(k, v.getRecordsLag());
         availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
       });
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b482d662a5e..4ad3a3810ac 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -35,7 +35,6 @@ import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -88,6 +87,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -108,6 +108,9 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -1101,11 +1104,17 @@ public class TablesResource {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         if (segmentDataManager instanceof RealtimeSegmentDataManager) {
           RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-          Map<String, ConsumerPartitionState> partitionStateMap =
-              realtimeSegmentDataManager.getConsumerPartitionState();
+          StreamMetadataProvider streamMetadataProvider =
+              ((RealtimeTableDataManager) 
(tableDataManager)).getStreamMetadataProvider(realtimeSegmentDataManager);
+          StreamPartitionMsgOffset latestMsgOffset =
+              
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(realtimeSegmentDataManager,
 streamMetadataProvider);
+          Map<String, ConsumerPartitionState> partitionIdToStateMap =
+              
realtimeSegmentDataManager.getConsumerPartitionState(latestMsgOffset);
           Map<String, String> recordsLagMap = new HashMap<>();
           Map<String, String> availabilityLagMsMap = new HashMap<>();
-          
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k,
 v) -> {
+          Map<String, PartitionLagState> partitionToLagState =
+              
streamMetadataProvider.getCurrentPartitionLagState(partitionIdToStateMap);
+          partitionToLagState.forEach((k, v) -> {
             recordsLagMap.put(k, v.getRecordsLag());
             availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
           });
@@ -1114,15 +1123,12 @@ public class TablesResource {
           segmentConsumerInfoList.add(new 
SegmentConsumerInfo(segmentDataManager.getSegmentName(),
               realtimeSegmentDataManager.getConsumerState().toString(),
               realtimeSegmentDataManager.getLastConsumedTimestamp(), 
partitiionToOffsetMap,
-              new 
SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap, 
partitionStateMap.entrySet().stream()
+              new 
SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap, 
partitionIdToStateMap.entrySet()
+                  .stream()
                   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getUpstreamLatestOffset().toString())),
                   recordsLagMap, availabilityLagMsMap)));
         }
       }
-    } catch (ConcurrentModificationException e) {
-      LOGGER.warn("Multi-threaded access is unsafe for KafkaConsumer, caught 
exception when fetching stream offset",
-          e);
-      return segmentConsumerInfoList;
     } catch (Exception e) {
       throw new WebApplicationException("Caught exception when getting 
consumer info for table: " + realtimeTableName);
     } finally {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 01f429a5117..13dd8075841 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -24,6 +24,9 @@ import java.util.Set;
 import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 
@@ -56,7 +59,8 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
   }
 
   @Override
-  protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager) {
+  protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
+      RealtimeTableDataManager realtimeTableDataManager) {
     long now = now();
     long latestIngestionTimestamp =
         
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
@@ -73,7 +77,12 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     // message is too old to pass the freshness check. We check this condition 
separately to avoid hitting
     // the stream consumer to check partition count if we're already caught up.
     StreamPartitionMsgOffset currentOffset = 
rtSegmentDataManager.getCurrentOffset();
-    StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.fetchLatestStreamOffset(5000);
+
+    StreamMetadataProvider streamMetadataProvider =
+        
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+    StreamPartitionMsgOffset latestStreamOffset =
+        
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager, 
streamMetadataProvider);
+
     if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
       _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
               + "But the current ingested offset is equal to the latest 
available offset {}.", segmentName, freshnessMs,
@@ -81,21 +90,18 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
       return true;
     }
 
-    StreamPartitionMsgOffset earliestStreamOffset = 
rtSegmentDataManager.fetchEarliestStreamOffset(5000);
-
     long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
     if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
       _logger.warn("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
-              + "But the current ingested offset {} has been idle for {}ms. At 
offset {}. Earliest offset {}. "
-              + "Latest offset {}.", segmentName, freshnessMs, 
_minFreshnessMs, currentOffset, idleTimeMs,
-          currentOffset,
-          earliestStreamOffset, latestStreamOffset);
+              + "But the current ingested offset {} has been idle for {}ms. At 
offset {}. " + "Latest offset {}.",
+          segmentName, freshnessMs, _minFreshnessMs, currentOffset, 
idleTimeMs, currentOffset, latestStreamOffset);
       return true;
     }
 
     _logger.info("Segment {} with freshness {}ms has not caught up within "
-            + "min freshness {}. At offset {}. Earliest offset {}. Latest 
offset {}.", segmentName, freshnessMs,
-        _minFreshnessMs, currentOffset, earliestStreamOffset, 
latestStreamOffset);
+            + "min freshness {}. At offset {}. Latest offset {}.", 
segmentName, freshnessMs, _minFreshnessMs,
+        currentOffset,
+        latestStreamOffset);
     return false;
   }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index df8081f136a..56ed05920e3 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -93,7 +94,7 @@ public abstract class IngestionBasedConsumptionStatusChecker {
             continue;
           }
           RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-          if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+          if (isSegmentCaughtUp(segName, rtSegmentDataManager, 
(RealtimeTableDataManager) tableDataManager)) {
             caughtUpSegments.add(segName);
           }
         } finally {
@@ -135,7 +136,8 @@ public abstract class 
IngestionBasedConsumptionStatusChecker {
     return numLaggingSegments;
   }
 
-  protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager);
+  protected abstract boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
+      RealtimeTableDataManager realtimeTableDataManager);
 
   protected boolean isOffsetCaughtUp(String segmentName,
       StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset 
latestOffset) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
index b4f2ba12e25..e38f4fbbe5f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusChecker.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 
@@ -42,7 +43,8 @@ public class OffsetBasedConsumptionStatusChecker extends 
IngestionBasedConsumpti
   }
 
   @Override
-  protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager) {
+  protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
+      RealtimeTableDataManager realtimeTableDataManager) {
     StreamPartitionMsgOffset latestIngestedOffset = 
rtSegmentDataManager.getCurrentOffset();
     StreamPartitionMsgOffset latestStreamOffset = 
rtSegmentDataManager.getLatestStreamOffsetAtStartupTime();
 
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 6b10fd1b36a..40b3a40e2e9 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -28,12 +28,15 @@ import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -75,8 +78,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -104,9 +107,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrB0.getSegment()).thenReturn(mockSegment);
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(200));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(2000));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
+
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     //              current offset          latest stream offset    current 
time    last ingestion time
@@ -128,8 +144,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
   }
 
   @Test
-  public void testWithDroppedTableAndSegment()
-      throws InterruptedException {
+  public void testWithDroppedTableAndSegment() {
     String segA0 = "tableA__0__0__123Z";
     String segA1 = "tableA__1__0__123Z";
     String segB0 = "tableB__0__0__123Z";
@@ -146,7 +161,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null);
 
@@ -155,7 +170,12 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     // ensure negative values are ignored
     setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
@@ -202,8 +222,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -215,9 +235,21 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(200));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(2000));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -268,8 +300,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -281,9 +313,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
@@ -344,8 +389,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -357,9 +402,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
@@ -396,8 +454,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -409,9 +467,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(200));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(2000));
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
+
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -459,8 +530,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -472,9 +543,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(20));
-    when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(200));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(null);
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+
+    when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+    when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+
+    when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
+    when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of());
+
     when(segMngrA0.getCurrentOffset()).thenReturn(null);
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -495,7 +579,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     // segB0              0                       0                      100   
            0
     setupLatestIngestionTimestamp(segMngrA0, 89L);
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
-    when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new 
LongMsgOffset(0));
+    when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index 2248f731d2d..e63b10fafa0 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.testng.annotations.Test;
 
@@ -52,8 +52,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
             
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -106,8 +106,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -166,8 +166,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
             
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
@@ -221,8 +221,8 @@ public class OffsetBasedConsumptionStatusCheckerTest {
             
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments));
 
     // setup TableDataMangers
-    TableDataManager tableDataManagerA = mock(TableDataManager.class);
-    TableDataManager tableDataManagerB = mock(TableDataManager.class);
+    RealtimeTableDataManager tableDataManagerA = 
mock(RealtimeTableDataManager.class);
+    RealtimeTableDataManager tableDataManagerB = 
mock(RealtimeTableDataManager.class);
     
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
     
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 630f69e3cf0..6502ea1a474 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -54,6 +54,10 @@ public abstract class StreamConsumerFactory {
    */
   public abstract StreamMetadataProvider createStreamMetadataProvider(String 
clientId);
 
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId, 
boolean concurrentAccessExpected) {
+    return createStreamMetadataProvider(clientId);
+  }
+
   public StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory() {
     return new LongMsgOffsetFactory();
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 0ef42b8dde1..4b5666a7a35 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -55,6 +55,9 @@ public interface StreamMetadataProvider extends Closeable {
 
   /**
    * Fetches the latest offset for a set of given partition Ids.
+   * Implementations may not be thread-safe. Callers that need concurrent 
access
+   * should use a thread-safe wrapper.
+   *
    * @param partitionIds partition Ids of the stream
    * @param timeoutMillis fetch timeout
    * @return latest {@link StreamPartitionMsgOffset} for each partition Id.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to