This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f73e756d06d Fix Realtime Ingestion Metrics (#16783)
f73e756d06d is described below

commit f73e756d06d7ba5aa6592e6d77a53401848fae94
Author: NOOB <[email protected]>
AuthorDate: Tue Oct 7 15:54:03 2025 +0530

    Fix Realtime Ingestion Metrics (#16783)
    
    * Enhance and fix realtime ingestion delay calculation
    
    * wip
    
    * wip
    
    * updates tracker
    
    * Fixes tracker test
    
    * Fixes lint and tests
    
    * Removes endToEndIngestionDelay
    
    * updates code comments and improve perf
    
    * Adds asserts
    
    * nit
    
    * refactors code
    
    * Adds condn to check _isServerReadyToServeQueries
    
    * Updates code doc
    
    * Adds test to verify metrics
    
    * updates test
    
    * supports multi-stream ingestion tracking
    
    * Adds test for fetching latest stream offset
    
    * Updates tests and adds metric description
    
    * Fixes lint
    
    * Removes e2e ingestion delay ms gauge
    
    * nit
    
    * Misc fixes and refactoring
    
    * misc fixes
    
    * minor refactoring
    
    * refactoring and perf fix
    
    * Fixes tests
    
    * Addresses comments
    
    * continue if failed to create stream metadata provider
    
    * Adds ability to update stream metadata provider
    
    * fixes lint
    
    * Adds metrics and code comments
    
    * minor refactoring
    
    * Adds test for IngestionconfigUtil
    
    * Fixes currentOffset bug
    
    * restore StreamMessageMetadata class
    
    * nit
---
 .../apache/pinot/common/metrics/ServerGauge.java   |  12 +-
 .../apache/pinot/common/metrics/ServerMeter.java   |   4 +-
 .../apache/pinot/common/metrics/ServerTimer.java   |   5 +-
 .../prometheus/ServerPrometheusMetricsTest.java    |   5 +-
 .../manager/realtime/IngestionDelayTracker.java    | 470 ++++++++++++++-------
 .../realtime/RealtimeSegmentDataManager.java       |   9 +-
 .../manager/realtime/RealtimeTableDataManager.java |  27 +-
 .../realtime/IngestionDelayTrackerTest.java        | 329 ++++++++++++---
 .../fakestream/FakeStreamMetadataProvider.java     |  18 +
 .../kafka20/KafkaStreamMetadataProvider.java       |  28 ++
 .../kafka20/KafkaPartitionLevelConsumerTest.java   |  26 +-
 .../kafka30/KafkaStreamMetadataProvider.java       |  28 ++
 .../kafka30/KafkaPartitionLevelConsumerTest.java   |  26 +-
 .../kinesis/KinesisStreamMetadataProvider.java     |   5 +
 .../pulsar/PulsarStreamMetadataProvider.java       |   5 +
 .../pinot/spi/stream/StreamMetadataProvider.java   |  17 +
 .../pinot/spi/utils/IngestionConfigUtils.java      |  24 ++
 .../pinot/spi/utils/IngestionConfigUtilsTest.java  |  25 ++
 18 files changed, 791 insertions(+), 272 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index dcc8d6776c5..a0401497b3c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -85,9 +85,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
    */
   NETTY_POOLED_THREADLOCALCACHE("bytes", true),
   NETTY_POOLED_CHUNK_SIZE("bytes", true),
-  // Ingestion delay metrics
-  REALTIME_INGESTION_DELAY_MS("milliseconds", false),
-  END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
   LUCENE_INDEXING_DELAY_MS("milliseconds", false),
   LUCENE_INDEXING_DELAY_DOCS("documents", false),
   // Needed to track if valid doc id snapshots are present for faster restarts
@@ -95,9 +92,12 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   UPSERT_QUERYABLE_DOC_ID_SNAPSHOT_COUNT("upsertQueryableDocIdSnapshotCount", 
false),
   UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", 
false),
   UPSERT_QUERYABLE_DOCS_IN_SNAPSHOT_COUNT("upsertQueryableDocIdsInSnapshot", 
false),
-  REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
-  REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
-  REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
+  REALTIME_INGESTION_OFFSET_LAG("offsetLag", false,
+      "The difference between latest message offset and the last consumed 
message offset."),
+  REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false, "The offset of 
the latest message in the upstream."),
+  REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of 
the last consumed message."),
+  REALTIME_INGESTION_DELAY_MS("milliseconds", false,
+      "The difference of the current timestamp and the timestamp present in 
the last consumed message record."),
   REALTIME_CONSUMER_DIR_USAGE("bytes", true),
   SEGMENT_DOWNLOAD_SPEED("bytes", true),
   PREDOWNLOAD_SPEED("bytes", true),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index fbf006ec3d0..a10f25661c8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -224,7 +224,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
    */
   MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
   // Workload Budget exceeded counter
-  WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times 
workload budget exceeded");
+  WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times 
workload budget exceeded"),
+  INGESTION_DELAY_TRACKING_ERRORS("errors", false,
+      "Indicates the count of errors encountered while tracking ingestion 
delay.");
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 0f72acd8264..5c6604ad193 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -106,7 +106,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   SEGMENT_DOWNLOAD_FROM_DEEP_STORE_TIME_MS("millis", false,
       "Time taken to download a segment from deep store (including untar and 
move operations)"),
   SEGMENT_DOWNLOAD_FROM_PEERS_TIME_MS("millis", false,
-      "Time taken to download a segment from peers (including untar and move 
operations)");
+      "Time taken to download a segment from peers (including untar and move 
operations)"),
+
+  // Ingestion metrics
+  INGESTION_DELAY_TRACKING_MS("milliseconds", false, "Time taken to run a 
trackIngestionDelay cycle");
 
   private final String _timerName;
   private final boolean _global;
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 4a4a3c14407..79fb88f56ed 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -55,9 +55,8 @@ public abstract class ServerPrometheusMetricsTest extends 
PinotPrometheusMetrics
   private static final List<ServerGauge> GAUGES_ACCEPTING_PARTITION =
       List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, 
ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
           ServerGauge.REALTIME_INGESTION_OFFSET_LAG, 
ServerGauge.REALTIME_INGESTION_DELAY_MS,
-          ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, 
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
-          ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
-          ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+          ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, 
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+          ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
 
   private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
       List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, 
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 90e19ead18f..cbdda8b8903 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager.realtime;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -30,30 +31,34 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A Class to track realtime ingestion delay for table partitions on a given 
server.
  * Highlights:
  * 1-An object of this class is hosted by each RealtimeTableDataManager.
  * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
- * 3-Partition delays are updated by all RealtimeSegmentDataManager objects 
hosted in the corresponding
- *   RealtimeTableDataManager.
+ * 3-The current consumption status of partitions are updated by the 
RealtimeSegmentDataManager objects hosted in the
+ * corresponding RealtimeTableDataManager.
  * 4-Individual metrics are associated with each partition being tracked.
  * 5-Delays for partitions that do not have events to consume are reported as 
zero.
  * 6-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
@@ -63,6 +68,8 @@ import org.slf4j.LoggerFactory;
  *   partition. If not, we stop tracking the respective partition.
  * 8-A scheduled executor thread is started by this object to track timeouts 
of partitions and drive the reading
  * of their ideal state.
+ * 9-A scheduled executor thread is started by this object to fetch the latest 
stream offset from upstream and create
+ * metrics for new partitions for which there is no consumer present on the 
server.
  *
  *  The following diagram illustrates the object interactions with main 
external APIs
  *
@@ -82,139 +89,253 @@ import org.slf4j.LoggerFactory;
  *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
  *   |___________________|
  *
- * TODO: handle bug situations like the one where a partition is not allocated 
to a given server due to a bug.
  */
 
 public class IngestionDelayTracker {
 
   private static class IngestionInfo {
-    final long _ingestionTimeMs;
-    final long _firstStreamIngestionTimeMs;
-    final StreamPartitionMsgOffset _currentOffset;
-    final StreamPartitionMsgOffset _latestOffset;
+    volatile long _ingestionTimeMs;
+    @Nullable
+    volatile StreamPartitionMsgOffset _currentOffset;
+
+    IngestionInfo(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset) {
+      _ingestionTimeMs = ingestionTimeMs;
+      _currentOffset = currentOffset;
+    }
 
-    IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
-        @Nullable StreamPartitionMsgOffset currentOffset, @Nullable 
StreamPartitionMsgOffset latestOffset) {
+    void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset) {
       _ingestionTimeMs = ingestionTimeMs;
-      _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
       _currentOffset = currentOffset;
-      _latestOffset = latestOffset;
     }
   }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionDelayTracker.class);
-
   // Sleep interval for scheduled executor service thread that triggers read 
of ideal state
-  private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 
300000; // 5 minutes +/- precision in timeouts
+  // 5 minutes +/- precision in timeouts
+  private static final long METRICS_REMOVAL_INTERVAL_MS = 
TimeUnit.MINUTES.toMillis(5);
+  // 30 seconds +/- precision in timeouts
+  private static final long METRICS_TRACKING_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(30);
   // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
-  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  private static final long PARTITION_TIMEOUT_MS = 
TimeUnit.MINUTES.toMillis(10);
   // Delay scheduled executor service for this amount of time after starting 
service
   private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
-
   // Cache expire time for ignored segment if there is no update from the 
segment.
   private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
-
-  // Per partition info for all partitions active for the current table.
-  private final Map<Integer, IngestionInfo> _ingestionInfoMap = new 
ConcurrentHashMap<>();
-
-  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
-  // go back to CONSUMING in some period of time, we verify whether they are 
still hosted in this server by reading
-  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
-  // TODO: Consider removing this mechanism after releasing 1.2.0, and use 
{@link #stopTrackingPartitionIngestionDelay}
-  //       instead.
-  private final Map<Integer, Long> _partitionsMarkedForVerification = new 
ConcurrentHashMap<>();
-
-  private final Cache<String, Boolean> _segmentsToIgnore =
-      
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, 
TimeUnit.MINUTES).build();
-
-  // TODO: Make thread pool a server/cluster level config
-  // ScheduledExecutorService to check partitions that are inactive against 
ideal state.
-  private final ScheduledExecutorService _scheduledExecutor = 
Executors.newScheduledThreadPool(2);
+  // Timeout after 5 seconds while fetching the latest stream offset.
+  private static final long LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 
   private final ServerMetrics _serverMetrics;
   private final String _tableNameWithType;
   private final String _metricName;
-
   private final RealtimeTableDataManager _realTimeTableDataManager;
   private final Supplier<Boolean> _isServerReadyToServeQueries;
+  private final Cache<String, Boolean> _segmentsToIgnore =
+      
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, 
TimeUnit.MINUTES).build();
+  // Map to describe the partitions for which the metrics are being reported.
+  // This map is accessed by:
+  // 1. _ingestionDelayTrackingScheduler thread.
+  // 2. All threads which can removes metrics - Consumer thread, Helix Thread, 
Server API Thread, etc.
+  // 3. Consumer thread when it updates the ingestion info of the partition 
for the first time.
+  private final Map<Integer, Boolean> _partitionsTracked = new 
ConcurrentHashMap<>();
+  // Map to hold the ingestion info reported by the consumer.
+  // This map is accessed by:
+  // 1. _ingestionDelayTrackingScheduler thread.
+  // 2. All threads which can removes metrics - Consumer thread, Helix Thread, 
Server API Thread, etc.
+  // 3. Consumer thread when it updates the ingestion info of the partition.
+  private final Map<Integer, IngestionInfo> _ingestionInfoMap = new 
ConcurrentHashMap<>();
+  private final Map<Integer, Long> _partitionsMarkedForVerification = new 
ConcurrentHashMap<>();
+  private final ScheduledExecutorService _metricsRemovalScheduler;
+  private final ScheduledExecutorService _ingestionDelayTrackingScheduler;
 
-  private Clock _clock;
+  private Clock _clock = Clock.systemUTC();
+
+  protected volatile Map<Integer, StreamPartitionMsgOffset> 
_partitionIdToLatestOffset;
+  protected volatile Set<Integer> _partitionsHostedByThisServer = new 
HashSet<>();
+  // Map of StreamMetadataProvider to fetch upstream latest stream offset 
(Table can have multiple upstream topics)
+  // This map is accessed by:
+  // 1. _ingestionDelayTrackingScheduler thread.
+  // 2. All threads which can removes metrics - Consumer thread, Helix Thread, 
Server API Thread, etc.
+  protected Map<Integer, StreamMetadataProvider> 
_streamConfigIndexToStreamMetadataProvider = new ConcurrentHashMap<>();
 
-  @VisibleForTesting
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
-      RealtimeTableDataManager realtimeTableDataManager, int 
scheduledExecutorThreadTickIntervalMs,
-      Supplier<Boolean> isServerReadyToServeQueries)
+      RealtimeTableDataManager realtimeTableDataManager)
       throws RuntimeException {
+    this(serverMetrics, tableNameWithType, realtimeTableDataManager, 
METRICS_REMOVAL_INTERVAL_MS,
+        METRICS_TRACKING_INTERVAL_MS, 
realtimeTableDataManager.getIsServerReadyToServeQueries());
+  }
+
+  @VisibleForTesting
+  public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+      RealtimeTableDataManager realtimeTableDataManager, long 
metricsRemovalIntervalMs, long metricsTrackingIntervalMs,
+      Supplier<Boolean> isServerReadyToServeQueries) {
     _serverMetrics = serverMetrics;
     _tableNameWithType = tableNameWithType;
     _metricName = tableNameWithType;
     _realTimeTableDataManager = realtimeTableDataManager;
-    _clock = Clock.systemUTC();
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
-    // Handle negative timer values
-    if (scheduledExecutorThreadTickIntervalMs <= 0) {
-      throw new RuntimeException("Illegal timer timeout argument, expected > 
0, got="
-          + scheduledExecutorThreadTickIntervalMs + " for table=" + 
_tableNameWithType);
+
+    _metricsRemovalScheduler = 
Executors.newSingleThreadScheduledExecutor(getThreadFactory(
+        "IngestionDelayMetricsRemovalThread-" + 
TableNameBuilder.extractRawTableName(tableNameWithType)));
+    // schedule periodically to remove in-active partitions.
+    
_metricsRemovalScheduler.scheduleWithFixedDelay(this::timeoutInactivePartitions,
+        INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, metricsRemovalIntervalMs, 
TimeUnit.MILLISECONDS);
+
+    createOrUpdateStreamMetadataProvider();
+
+    _ingestionDelayTrackingScheduler = 
Executors.newSingleThreadScheduledExecutor(
+        getThreadFactory("IngestionDelayTrackingThread-" + 
TableNameBuilder.extractRawTableName(tableNameWithType)));
+    // schedule periodically to update latest upstream offset or create 
metrics for newly added partitions.
+    
_ingestionDelayTrackingScheduler.scheduleWithFixedDelay(this::trackIngestionDelay,
+        INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, metricsTrackingIntervalMs, 
TimeUnit.MILLISECONDS);
+  }
+
+  private void createOrUpdateStreamMetadataProvider() {
+    List<StreamConfig> streamConfigs =
+        
IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft());
+
+    for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); 
streamConfigIndex++) {
+      if 
(_streamConfigIndexToStreamMetadataProvider.containsKey(streamConfigIndex)) {
+        continue;
+      }
+      StreamConfig streamConfig = null;
+      StreamMetadataProvider streamMetadataProvider;
+      try {
+        streamConfig = streamConfigs.get(streamConfigIndex);
+        StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+        String clientId =
+            
IngestionConfigUtils.getTableTopicUniqueClientId(IngestionDelayTracker.class.getSimpleName(),
 streamConfig);
+        streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(clientId);
+      } catch (Exception e) {
+        LOGGER.error("Failed to create stream metadata provider for 
streamConfig: {}", streamConfig, e);
+        continue;
+      }
+
+      assert streamMetadataProvider != null;
+      _streamConfigIndexToStreamMetadataProvider.put(streamConfigIndex, 
streamMetadataProvider);
+
+      if ((streamMetadataProvider.supportsOffsetLag()) && 
(_partitionIdToLatestOffset == null)) {
+        _partitionIdToLatestOffset = new ConcurrentHashMap<>();
+      }
     }
+  }
+
+  private void trackIngestionDelay() {
+    long startMs = System.currentTimeMillis();
+    try {
+      if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
+        // Do not update the ingestion delay metrics during server startup 
period
+        // or once the table data manager has been shutdown.
+        return;
+      }
+      Set<Integer> partitionsHosted = _partitionsHostedByThisServer;
+
+      if (_ingestionInfoMap.size() > partitionsHosted.size()) {
+        // In-case new partition got assigned to the server before 
_partitionsHostedByThisServer was updated.
+        partitionsHosted.addAll(_ingestionInfoMap.keySet());
+      }
+
+      if (partitionsHosted.isEmpty()) {
+        return;
+      }
+
+      updateLatestStreamOffset(partitionsHosted);
+
+      for (Integer partitionId : partitionsHosted) {
+        _partitionsTracked.computeIfAbsent(partitionId, k -> {
+          createMetrics(partitionId);
+          return true;
+        });
+      }
+    } catch (Throwable t) {
+      LOGGER.error("Failed to track ingestion delay metrics.", t);
+      
_serverMetrics.addMeteredTableValue(_realTimeTableDataManager.getTableName(),
+          ServerMeter.INGESTION_DELAY_TRACKING_ERRORS, 1);
+    } finally {
+      _serverMetrics.addTimedTableValue(_metricName, 
ServerTimer.INGESTION_DELAY_TRACKING_MS,
+          System.currentTimeMillis() - startMs, TimeUnit.MILLISECONDS);
+    }
+  }
 
-    // ThreadFactory to set the thread's name
-    ThreadFactory threadFactory = new ThreadFactory() {
+  @VisibleForTesting
+  void updateLatestStreamOffset(Set<Integer> partitionsHosted) {
+    Map<Integer, Set<Integer>> streamIndexToStreamPartitionIds =
+        
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(partitionsHosted);
+
+    if (streamIndexToStreamPartitionIds.size() > 
_streamConfigIndexToStreamMetadataProvider.size()) {
+      // There might be a new stream config added or need to retry creation of 
streamMetadataProvider for a stream
+      // which might have failed before.
+      createOrUpdateStreamMetadataProvider();
+    }
+
+    for (Map.Entry<Integer, StreamMetadataProvider> entry : 
_streamConfigIndexToStreamMetadataProvider.entrySet()) {
+      int streamIndex = entry.getKey();
+      StreamMetadataProvider streamMetadataProvider = entry.getValue();
+
+      if (!streamIndexToStreamPartitionIds.containsKey(streamIndex)) {
+        // Server is not hosting any partitions of this stream.
+        continue;
+      }
+
+      if (streamMetadataProvider.supportsOffsetLag()) {
+        Set<Integer> streamPartitionIds = 
streamIndexToStreamPartitionIds.get(streamIndex);
+        try {
+          Map<Integer, StreamPartitionMsgOffset> 
streamPartitionIdToLatestOffset =
+              
streamMetadataProvider.fetchLatestStreamOffset(streamPartitionIds,
+                  LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS);
+          if (streamIndex > 0) {
+            // Need to convert stream partition Ids back to pinot partition 
Ids.
+            for (Map.Entry<Integer, StreamPartitionMsgOffset> latestOffsetEntry
+                : streamPartitionIdToLatestOffset.entrySet()) {
+              _partitionIdToLatestOffset.put(
+                  
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(latestOffsetEntry.getKey(),
+                      streamIndex), latestOffsetEntry.getValue());
+            }
+          } else {
+            _partitionIdToLatestOffset.putAll(streamPartitionIdToLatestOffset);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Failed to update latest stream offsets for partitions: 
{}", streamPartitionIds, e);
+        }
+      }
+    }
+  }
+
+  private ThreadFactory getThreadFactory(String threadName) {
+    return new ThreadFactory() {
       private final ThreadFactory _defaultFactory = 
Executors.defaultThreadFactory();
 
       @Override
       public Thread newThread(Runnable r) {
         Thread thread = _defaultFactory.newThread(r);
-        thread.setName("IngestionDelayTimerThread-" + 
TableNameBuilder.extractRawTableName(tableNameWithType));
+        thread.setName(threadName);
         return thread;
       }
     };
-    ((ScheduledThreadPoolExecutor) 
_scheduledExecutor).setThreadFactory(threadFactory);
-
-    _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
-        INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, 
scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
   }
 
-  public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
-      RealtimeTableDataManager tableDataManager, Supplier<Boolean> 
isServerReadyToServeQueries) {
-    this(serverMetrics, tableNameWithType, tableDataManager, 
SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
-        isServerReadyToServeQueries);
-  }
-
-  /*
-   * Helper function to get the ingestion delay for a given ingestion time.
-   * Ingestion delay == Current Time - Ingestion Time
-   *
-   * @param ingestionTimeMs original ingestion time in milliseconds.
-   */
-  private long getIngestionDelayMs(long ingestionTimeMs) {
-    if (ingestionTimeMs < 0) {
-      return 0;
-    }
-    // Compute aged delay for current partition
-    long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
-    // Correct to zero for any time shifts due to NTP or time reset.
-    agedIngestionDelayMs = Math.max(agedIngestionDelayMs, 0);
-    return agedIngestionDelayMs;
-  }
-
-  /*
+  /**
    * Helper function to be called when we should stop tracking a given 
partition. Removes the partition from
    * all our maps.
    *
    * @param partitionId partition ID which we should stop tracking.
    */
   private void removePartitionId(int partitionId) {
-    _ingestionInfoMap.compute(partitionId, (k, v) -> {
+    _partitionsTracked.compute(partitionId, (k, v) -> {
       if (v != null) {
+        int streamConfigIndex = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
+        StreamMetadataProvider streamMetadataProvider =
+            _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
         // Remove all metrics associated with this partition
+        if (streamMetadataProvider != null && 
streamMetadataProvider.supportsOffsetLag()) {
+          _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+          _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
+          _serverMetrics.removePartitionGauge(_metricName, partitionId,
+              ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+        }
         _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
-        _serverMetrics.removePartitionGauge(_metricName, partitionId,
-            ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
-        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
-        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
-        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
         LOGGER.info("Successfully removed ingestion metrics for partition id: 
{}", partitionId);
       }
+      _ingestionInfoMap.remove(partitionId);
       return null;
     });
 
@@ -222,7 +343,7 @@ public class IngestionDelayTracker {
     _partitionsMarkedForVerification.remove(partitionId);
   }
 
-  /*
+  /**
    * Helper functions that creates a list of all the partitions that are 
marked for verification and whose
    * timeouts are expired. This helps us optimize checks of the ideal state.
    */
@@ -248,27 +369,41 @@ public class IngestionDelayTracker {
     _clock = clock;
   }
 
+  public void createMetrics(int partitionId) {
+    int streamConfigIndex = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
+    StreamMetadataProvider streamMetadataProvider = 
_streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
+
+    if (streamMetadataProvider != null && 
streamMetadataProvider.supportsOffsetLag()) {
+      _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
+          () -> getPartitionIngestionOffsetLag(partitionId));
+
+      _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+          ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> 
getPartitionIngestionConsumingOffset(partitionId));
+
+      _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+          ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> 
getLatestPartitionOffset(partitionId));
+    }
+    _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS,
+        () -> getPartitionIngestionDelayMs(partitionId));
+  }
+
   /**
    * Called by RealTimeSegmentDataManagers to update the ingestion delay 
metrics for a given partition.
    *
    * @param segmentName name of the consuming segment
    * @param partitionId partition id of the consuming segment (directly passed 
in to avoid parsing the segment name)
    * @param ingestionTimeMs ingestion time of the last consumed message (from 
{@link StreamMessageMetadata})
-   * @param firstStreamIngestionTimeMs ingestion time of the last consumed 
message in the first stream (from
-   *                                   {@link StreamMessageMetadata})
    * @param currentOffset offset of the last consumed message (from {@link 
StreamMessageMetadata})
-   * @param latestOffset offset of the latest message in the partition (from 
{@link StreamMetadataProvider})
    */
-  public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
-      long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
-      @Nullable StreamPartitionMsgOffset latestOffset) {
+  public void updateMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
+      @Nullable StreamPartitionMsgOffset currentOffset) {
     if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
       // Do not update the ingestion delay metrics during server startup period
       // or once the table data manager has been shutdown.
       return;
     }
 
-    if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && 
(currentOffset == null || latestOffset == null)) {
+    if ((ingestionTimeMs < 0) && (currentOffset == null)) {
       // Do not publish metrics if stream does not return valid ingestion time 
or offset.
       return;
     }
@@ -279,45 +414,27 @@ public class IngestionDelayTracker {
         return v;
       }
       if (v == null) {
-        // Add metric when we start tracking a partition. Only publish the 
metric if supported by the stream.
-        if (ingestionTimeMs > 0) {
-          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS,
-              () -> getPartitionIngestionDelayMs(partitionId));
-        }
-        if (firstStreamIngestionTimeMs > 0) {
-          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
-              ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
-              () -> getPartitionEndToEndIngestionDelayMs(partitionId));
-        }
-        if (currentOffset != null && latestOffset != null) {
-          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
-              () -> getPartitionIngestionOffsetLag(partitionId));
-        }
-
-        if (currentOffset != null) {
-          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
-              ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> 
getPartitionIngestionConsumingOffset(partitionId));
-        }
-
-        if (latestOffset != null) {
-          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
-              ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> 
getPartitionIngestionUpstreamOffset(partitionId));
-        }
+        _partitionsTracked.computeIfAbsent(partitionId, pid -> {
+          createMetrics(partitionId);
+          return true;
+        });
+        return new IngestionInfo(ingestionTimeMs, currentOffset);
       }
-      return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, 
currentOffset, latestOffset);
+      v.update(ingestionTimeMs, currentOffset);
+      return v;
     });
 
     // If we are consuming we do not need to track this partition for removal.
     _partitionsMarkedForVerification.remove(partitionId);
   }
 
-  /*
+  /**
    * Handle partition removal event. This must be invoked when we stop serving 
a given partition for
    * this table in the current server.
    *
    * @param partitionId partition id that we should stop tracking.
    */
-  public void stopTrackingPartitionIngestionDelay(int partitionId) {
+  public void stopTrackingPartition(int partitionId) {
     removePartitionId(partitionId);
   }
 
@@ -327,7 +444,7 @@ public class IngestionDelayTracker {
    *
    * @return Set of partitionIds for which ingestion metrics were removed.
    */
-  public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
+  public Set<Integer> stopTrackingAllPartitions() {
     Set<Integer> removedPartitionIds = new 
HashSet<>(_ingestionInfoMap.keySet());
     for (Integer partitionId : _ingestionInfoMap.keySet()) {
       removePartitionId(partitionId);
@@ -340,12 +457,12 @@ public class IngestionDelayTracker {
    * when we want to stop tracking the ingestion delay for a partition when 
the segment might still be consuming, e.g.
    * when the new consuming segment is created on a different server.
    */
-  public void stopTrackingPartitionIngestionDelay(String segmentName) {
+  public void stopTrackingPartition(String segmentName) {
     _segmentsToIgnore.put(segmentName, true);
     removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
   }
 
-  /*
+  /**
    * This method is used for timing out inactive partitions, so we don't 
display their metrics on current server.
    * When the inactive time exceeds some threshold, we read from ideal state 
to confirm we still host the partition,
    * if not we remove the partition from being tracked locally.
@@ -377,6 +494,7 @@ public class IngestionDelayTracker {
         removePartitionId(partitionId);
       }
     }
+    _partitionsHostedByThisServer = partitionsHostedByThisServer;
   }
 
   /**
@@ -391,7 +509,7 @@ public class IngestionDelayTracker {
     _partitionsMarkedForVerification.put(new 
LLCSegmentName(segmentName).getPartitionGroupId(), _clock.millis());
   }
 
-  /*
+  /**
    * Method to get timestamp used for the ingestion delay for a given 
partition.
    *
    * @param partitionId partition for which we are retrieving the delay
@@ -403,7 +521,7 @@ public class IngestionDelayTracker {
     return ingestionInfo != null ? ingestionInfo._ingestionTimeMs : 
Long.MIN_VALUE;
   }
 
-  /*
+  /**
    * Method to get ingestion delay for a given partition.
    *
    * @param partitionId partition for which we are retrieving the delay
@@ -412,79 +530,103 @@ public class IngestionDelayTracker {
    */
   public long getPartitionIngestionDelayMs(int partitionId) {
     IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
+    long ingestionTimeMs = 0;
+    if ((ingestionInfo != null) && (ingestionInfo._ingestionTimeMs > 0)) {
+      ingestionTimeMs = ingestionInfo._ingestionTimeMs;
+    }
+    // Compute aged delay for current partition
+    long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
+    // Correct to zero for any time shifts due to NTP or time reset.
+    return Math.max(agedIngestionDelayMs, 0);
   }
 
-  /*
-   * Method to get end to end ingestion delay for a given partition.
-   *
-   * @param partitionId partition for which we are retrieving the delay
+  /**
+   * Computes the ingestion lag for the given partition based on offset 
difference.
+   * <p>
+   * The lag is calculated as the difference between the latest upstream offset
+   * and the current consuming offset. Only {@link LongMsgOffset} types are 
supported.
    *
-   * @return End to end ingestion delay in milliseconds for the given 
partition ID.
+   * @param partitionId partition for which the ingestion lag is computed
+   * @return offset lag for the given partition
    */
-  public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
-  }
-
   public long getPartitionIngestionOffsetLag(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    if (ingestionInfo == null) {
-      return 0;
-    }
-    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
-    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
-    if (currentOffset == null || latestOffset == null) {
+    StreamPartitionMsgOffset latestOffset = 
_partitionIdToLatestOffset.get(partitionId);
+    if (latestOffset == null) {
       return 0;
     }
-    // TODO: Support other types of offsets
-    if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
-      return 0;
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    long currentOffset = 0;
+    if (ingestionInfo != null) {
+      StreamPartitionMsgOffset currentMsgOffset = ingestionInfo._currentOffset;
+      if (currentMsgOffset == null) {
+        // If currentOffset is set to null, it means:
+        // 1. The stream does not support offset lag (example: Kinesis). But 
if this was true,
+        // IngestionOffsetLag gauge will not be created and this method will 
never be called.
+        // 2. Server has caught up. Think of scenario where server restarts 
and server is already caught up.
+        return 0;
+      }
+      assert currentMsgOffset instanceof LongMsgOffset;
+      currentOffset = ((LongMsgOffset) currentMsgOffset).getOffset();
     }
-    return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
+    assert latestOffset instanceof LongMsgOffset;
+    return Math.max(0, ((LongMsgOffset) latestOffset).getOffset() - 
currentOffset);
   }
 
-  // Get the consuming offset for a given partition
+  /**
+   * Retrieves the latest offset consumed for the given partition.
+   *
+   * @param partitionId partition for which the consuming offset was retrieved
+   * @return consuming offset value for the given partition, or {@code 0} if 
no ingestion info is available
+   */
   public long getPartitionIngestionConsumingOffset(int partitionId) {
     IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
     if (ingestionInfo == null) {
       return 0;
     }
-    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
-    if (currentOffset == null) {
-      return 0;
-    }
-    // TODO: Support other types of offsets
-    if (!(currentOffset instanceof LongMsgOffset)) {
+    StreamPartitionMsgOffset currentMsgOffset = ingestionInfo._currentOffset;
+    if (currentMsgOffset == null) {
+      // If currentOffset is set to null, it means:
+      // 1. The stream does not support offset lag (example: Kinesis). But if 
this was true,
+      // IngestionOffsetLag gauge will not be created and this method will 
never be called.
+      // 2. Server has caught up. Think of scenario where server restarts and 
server is already caught up.
       return 0;
     }
-    return ((LongMsgOffset) currentOffset).getOffset();
+    assert currentMsgOffset instanceof LongMsgOffset;
+    return ((LongMsgOffset) currentMsgOffset).getOffset();
   }
 
-  // Get the latest offset in upstream data source for a given partition
-  public long getPartitionIngestionUpstreamOffset(int partitionId) {
-    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
-    if (ingestionInfo == null) {
-      return 0;
-    }
-    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+  /**
+   * Retrieves the latest offset in the upstream data source for the given 
partition.
+   *
+   * @param partitionId partition for which the latest upstream offset is 
retrieved
+   * @return latest offset value for the given partition, or {@code 0} if not 
available
+   */
+  public long getLatestPartitionOffset(int partitionId) {
+    StreamPartitionMsgOffset latestOffset = 
_partitionIdToLatestOffset.get(partitionId);
     if (latestOffset == null) {
       return 0;
     }
-    // TODO: Support other types of offsets
-    if (!(latestOffset instanceof LongMsgOffset)) {
-      return 0;
-    }
+    assert latestOffset instanceof LongMsgOffset;
     return ((LongMsgOffset) latestOffset).getOffset();
   }
 
-  /*
+  /**
    * We use this method to clean up when a table is being removed. No updates 
are expected at this time as all
    * RealtimeSegmentManagers should be down now.
    */
   public void shutdown() {
     // Now that segments can't report metric, destroy metric for this table
-    _scheduledExecutor.shutdown(); // ScheduledExecutor is installed in 
constructor so must always be cancelled
+    _metricsRemovalScheduler.shutdown(); // ScheduledExecutor is installed in 
constructor so must always be cancelled
+    if (_ingestionDelayTrackingScheduler != null) {
+      _ingestionDelayTrackingScheduler.shutdown();
+    }
+    for (StreamMetadataProvider streamMetadataProvider : 
_streamConfigIndexToStreamMetadataProvider.values()) {
+      try {
+        streamMetadataProvider.close();
+      } catch (IOException e) {
+        LOGGER.error("Failed to close streamMetadataProvider", e);
+      }
+    }
     if (!_isServerReadyToServeQueries.get()) {
       // Do not update the tracker state during server startup period
       return;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 597aa40ae2c..17336e89ff2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -2013,12 +2013,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private void updateIngestionMetrics(StreamMessageMetadata metadata) {
     if (metadata != null) {
       try {
-        StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, 
true);
         _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId,
-            metadata.getRecordIngestionTimeMs(), 
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
-            latestOffset);
+            metadata.getRecordIngestionTimeMs(), metadata.getOffset());
       } catch (Exception e) {
-        _segmentLogger.warn("Failed to fetch latest offset for updating 
ingestion delay", e);
+        _segmentLogger.warn("Failed to update the ingestion metrics", e);
       }
     }
   }
@@ -2029,8 +2027,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
    */
   private void setIngestionDelayToZero() {
     long currentTimeMs = System.currentTimeMillis();
-    _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId, currentTimeMs, currentTimeMs,
-        null, null);
+    _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId, currentTimeMs, null);
   }
 
   // This should be done during commit? We may not always commit when we build 
a segment....
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 406712e9d68..69184e2bfc0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -84,7 +84,6 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -160,8 +159,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   protected void doInit() {
     _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, 
_serverMetrics, _tableNameWithType);
     // Tracks ingestion delay of all partitions being served for this table
-    _ingestionDelayTracker =
-        new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, 
_isServerReadyToServeQueries);
+    _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, 
_tableNameWithType, this);
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
       _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -294,16 +292,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    * @param segmentName name of the consuming segment
    * @param partitionId partition id of the consuming segment (directly passed 
in to avoid parsing the segment name)
    * @param ingestionTimeMs ingestion time of the last consumed message (from 
{@link StreamMessageMetadata})
-   * @param firstStreamIngestionTimeMs ingestion time of the last consumed 
message in the first stream (from
-   *                                   {@link StreamMessageMetadata})
    * @param currentOffset offset of the last consumed message (from {@link 
StreamMessageMetadata})
-   * @param latestOffset offset of the latest message in the partition (from 
{@link StreamMetadataProvider})
    */
   public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
-      long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
-      @Nullable StreamPartitionMsgOffset latestOffset) {
-    _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, firstStreamIngestionTimeMs,
-        currentOffset, latestOffset);
+      @Nullable StreamPartitionMsgOffset currentOffset) {
+    _ingestionDelayTracker.updateMetrics(segmentName, partitionId, 
ingestionTimeMs, currentOffset);
   }
 
   /**
@@ -320,7 +313,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    * still be consuming, e.g. when the new consuming segment is created on a 
different server.
    */
   public void removeIngestionMetrics(String segmentName) {
-    _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+    _ingestionDelayTracker.stopTrackingPartition(segmentName);
   }
 
   /**
@@ -332,7 +325,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @Override
   public void onConsumingToDropped(String segmentName) {
     // NOTE: No need to mark segment ignored here because it should have 
already been dropped.
-    _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new 
LLCSegmentName(segmentName).getPartitionGroupId());
+    _ingestionDelayTracker.stopTrackingPartition(new 
LLCSegmentName(segmentName).getPartitionGroupId());
   }
 
   /**
@@ -345,7 +338,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
    */
   @Override
   public void onConsumingToOffline(String segmentName) {
-    _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+    _ingestionDelayTracker.stopTrackingPartition(segmentName);
   }
 
   @Override
@@ -511,10 +504,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   public Set<Integer> stopTrackingPartitionIngestionDelay(@Nullable 
Set<Integer> partitionIds) {
     if (CollectionUtils.isEmpty(partitionIds)) {
-      return 
_ingestionDelayTracker.stopTrackingIngestionDelayForAllPartitions();
+      return _ingestionDelayTracker.stopTrackingAllPartitions();
     }
     for (Integer partitionId: partitionIds) {
-      _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+      _ingestionDelayTracker.stopTrackingPartition(partitionId);
     }
     return partitionIds;
   }
@@ -926,6 +919,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _enforceConsumptionInOrder = enforceConsumptionInOrder;
   }
 
+  public Supplier<Boolean> getIsServerReadyToServeQueries() {
+    return _isServerReadyToServeQueries;
+  }
+
   /**
    * Validate a schema against the table config for real-time record 
consumption.
    * Ideally, we should validate these things when schema is added or table is 
created, but either of these
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 1fdd12e00e7..cf25c1d4a1e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -22,53 +22,154 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class IngestionDelayTrackerTest {
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
-  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
+  private static final int METRICS_CLEANUP_TICK_INTERVAL_MS = 100;
+  private static final int METRICS_TRACKING_TICK_INTERVAL_MS = 100;
 
   private final ServerMetrics _serverMetrics = mock(ServerMetrics.class);
-  private final RealtimeTableDataManager _realtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+  private static final RealtimeTableDataManager REALTIME_TABLE_DATA_MANAGER = 
mock(RealtimeTableDataManager.class);
+
+  static {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName("ts")
+        .setNullHandlingEnabled(true)
+        .setStreamConfigs(getStreamConfigs())
+        .build();
+    
when(REALTIME_TABLE_DATA_MANAGER.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
 null));
+  }
+
+  private static Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs.put("stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.factory.class.name",
+        
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+    return streamConfigs;
+  }
+
+  private static class MockIngestionDelayTracker extends IngestionDelayTracker 
{
+
+    private Map<Integer, Map<String, List<Long>>> _partitionToMetricToValues;
+    private ScheduledExecutorService _scheduledExecutorService;
+
+    public MockIngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+        RealtimeTableDataManager realtimeTableDataManager)
+        throws RuntimeException {
+      this(serverMetrics, tableNameWithType, realtimeTableDataManager, 5, 5, 
() -> true);
+    }
+
+    public MockIngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+        RealtimeTableDataManager realtimeTableDataManager, int 
timerThreadTickIntervalMs, int metricTrackingIntervalMs,
+        Supplier<Boolean> isServerReadyToServeQueries) {
+      super(serverMetrics, tableNameWithType, realtimeTableDataManager, 
timerThreadTickIntervalMs,
+          metricTrackingIntervalMs, isServerReadyToServeQueries);
+    }
+
+    @Override
+    public void createMetrics(int partitionId) {
+      if (_partitionToMetricToValues == null) {
+        _partitionToMetricToValues = new ConcurrentHashMap<>();
+        _scheduledExecutorService = Executors.newScheduledThreadPool(2);
+      }
+      Map<String, List<Long>> metricToValues = new HashMap<>();
+      _partitionToMetricToValues.put(partitionId, metricToValues);
+      if 
(_streamConfigIndexToStreamMetadataProvider.get(0).supportsOffsetLag()) {
+        
metricToValues.put(ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(), 
new ArrayList<>());
+        
metricToValues.put(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(),
 new ArrayList<>());
+        
metricToValues.put(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(),
 new ArrayList<>());
+      }
+      
metricToValues.put(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName(), new 
ArrayList<>());
+
+      _scheduledExecutorService.scheduleWithFixedDelay(() -> 
_partitionToMetricToValues.compute(partitionId, (k, v) -> {
+        Map<String, List<Long>> metricToValuesForPartition = 
_partitionToMetricToValues.get(partitionId);
+        if 
(_streamConfigIndexToStreamMetadataProvider.get(0).supportsOffsetLag()) {
+          
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName())
+              .add(getPartitionIngestionOffsetLag(partitionId));
+          
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName())
+              .add(getPartitionIngestionConsumingOffset(partitionId));
+          
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName())
+              .add(getLatestPartitionOffset(partitionId));
+        }
+        
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName())
+            .add(getPartitionIngestionDelayMs(partitionId));
+        return metricToValuesForPartition;
+      }), 0, 10, TimeUnit.MILLISECONDS);
+    }
+
+    void updatePartitionIdToLatestOffset(Map<Integer, 
StreamPartitionMsgOffset> partitionIdToLatestOffset) {
+      _partitionIdToLatestOffset = partitionIdToLatestOffset;
+    }
+
+    @Override
+    public void shutdown() {
+      if (_scheduledExecutorService != null) {
+        _scheduledExecutorService.shutdown();
+      }
+      super.shutdown();
+    }
+  }
 
   private IngestionDelayTracker createTracker() {
-    IngestionDelayTracker ingestionDelayTracker =
-        new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
_realtimeTableDataManager, () -> true);
-    // With no samples, the time reported must be zero
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
-    return ingestionDelayTracker;
+    return new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
REALTIME_TABLE_DATA_MANAGER);
   }
 
   @Test
   public void testTrackerConstructors() {
     // Test regular constructor
     IngestionDelayTracker ingestionDelayTracker =
-        new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
_realtimeTableDataManager, () -> true);
+        new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
REALTIME_TABLE_DATA_MANAGER);
 
     Clock clock = Clock.systemUTC();
     ingestionDelayTracker.setClock(clock);
 
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
+    Assert.assertTrue(ingestionDelayTracker.getPartitionIngestionDelayMs(0) <= 
clock.millis());
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
     ingestionDelayTracker.shutdown();
     // Test constructor with timer arguments
-    ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, 
REALTIME_TABLE_NAME, _realtimeTableDataManager,
-        TIMER_THREAD_TICK_INTERVAL_MS, () -> true);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
+    ingestionDelayTracker =
+        new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
REALTIME_TABLE_DATA_MANAGER,
+            METRICS_CLEANUP_TICK_INTERVAL_MS, 
METRICS_TRACKING_TICK_INTERVAL_MS, () -> true);
+    Assert.assertTrue(ingestionDelayTracker.getPartitionIngestionDelayMs(0) <= 
clock.millis());
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
     // Test bad timer args to the constructor
     try {
-      new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
_realtimeTableDataManager, 0, () -> true);
+      new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
REALTIME_TABLE_DATA_MANAGER, 0, 0, () -> true);
       Assert.fail("Must have asserted due to invalid arguments"); // 
Constructor must assert
     } catch (Exception e) {
       if ((e instanceof NullPointerException) || !(e instanceof 
RuntimeException)) {
@@ -86,7 +187,7 @@ public class IngestionDelayTrackerTest {
     final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 
234).getSegmentName();
 
     IngestionDelayTracker ingestionDelayTracker = createTracker();
-    // Use fixed clock so samples dont age
+    // Use fixed clock so samples don't age
     Instant now = Instant.now();
     ZoneId zoneId = ZoneId.systemDefault();
     Clock clock = Clock.fixed(now, zoneId);
@@ -94,52 +195,37 @@ public class IngestionDelayTrackerTest {
 
     // Test we follow a single partition up and down
     for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay; 
ingestionTimeMs++) {
-      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
-      ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, firstStreamIngestionTimeMs,
-          null, null);
+      ingestionDelayTracker.updateMetrics(segment0, partition0, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
           clock.millis() - ingestionTimeMs);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
-          clock.millis() - firstStreamIngestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
     }
 
     // Test tracking down a measure for a given partition
     for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0; 
ingestionTimeMs--) {
-      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
-      ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, firstStreamIngestionTimeMs,
-          null, null);
+      ingestionDelayTracker.updateMetrics(segment0, partition0, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
           clock.millis() - ingestionTimeMs);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
-          clock.millis() - (ingestionTimeMs + 1));
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
     }
 
     // Make the current partition maximum
-    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
maxTestDelay, maxTestDelay, null, null);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, maxTestDelay, 
null);
 
     // Bring up partition1 delay up and verify values
     for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay; 
ingestionTimeMs++) {
       long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
-      ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, firstStreamIngestionTimeMs,
-          null, null);
+      ingestionDelayTracker.updateMetrics(segment1, partition1, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
           clock.millis() - ingestionTimeMs);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
-          clock.millis() - firstStreamIngestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
     }
 
     // Bring down values of partition1 and verify values
     for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0; 
ingestionTimeMs--) {
-      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
-      ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, firstStreamIngestionTimeMs,
-          null, null);
+      ingestionDelayTracker.updateMetrics(segment1, partition1, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
           clock.millis() - ingestionTimeMs);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
-          clock.millis() - firstStreamIngestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
     }
 
@@ -168,9 +254,8 @@ public class IngestionDelayTrackerTest {
     Clock clock = Clock.fixed(now, zoneId);
     ingestionDelayTracker.setClock(clock);
     long ingestionTimeMs = clock.millis() - partition0Delay0;
-    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, ingestionTimeMs, 
null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
 
     // Advance clock and test aging
@@ -178,14 +263,11 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.setClock(offsetClock);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay0 + partition0Offset0Ms));
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
-        (partition0Delay0 + partition0Offset0Ms));
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
 
     ingestionTimeMs = offsetClock.millis() - partition0Delay1;
-    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, ingestionTimeMs, 
null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay1);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
 
     // Add some offset to the last sample and make sure we age that measure 
properly
@@ -195,9 +277,8 @@ public class IngestionDelayTrackerTest {
         (partition0Delay1 + partition0Offset1Ms));
 
     ingestionTimeMs = offsetClock.millis() - partition1Delay0;
-    ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    ingestionDelayTracker.updateMetrics(segment1, partition1, ingestionTimeMs, 
null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 partition1Delay0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
 partition1Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
 
     // Add some offset to the last sample and make sure we age that measure 
properly
@@ -225,19 +306,16 @@ public class IngestionDelayTrackerTest {
     for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
       String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 
123).getSegmentName();
       long ingestionTimeMs = clock.millis() - partitionId;
-      ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, ingestionTimeMs, null,
-          null);
+      ingestionDelayTracker.updateMetrics(segmentName, partitionId, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 partitionId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 partitionId);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
 ingestionTimeMs);
     }
     for (int partitionId = maxPartition; partitionId >= 0; partitionId--) {
-      ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+      ingestionDelayTracker.stopTrackingPartition(partitionId);
     }
     for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
       // Untracked partitions must return 0
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 0);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 clock.millis());
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
 Long.MIN_VALUE);
     }
   }
@@ -253,20 +331,17 @@ public class IngestionDelayTrackerTest {
 
     String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
123).getSegmentName();
     long ingestionTimeMs = clock.millis() - 10;
-    ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    ingestionDelayTracker.updateMetrics(segmentName, 0, ingestionTimeMs, null);
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
10);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 10);
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
ingestionTimeMs);
 
-    ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 0);
+    ingestionDelayTracker.stopTrackingPartition(segmentName);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
clock.millis());
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
 
     // Should not update metrics for removed segment
-    ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, 
ingestionTimeMs, ingestionTimeMs, null, null);
-    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 0);
+    ingestionDelayTracker.updateMetrics(segmentName, 0, ingestionTimeMs, null);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
clock.millis());
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
   }
 
@@ -285,10 +360,8 @@ public class IngestionDelayTrackerTest {
     for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
       String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 
123).getSegmentName();
       long ingestionTimeMs = clock.millis() - partitionId;
-      ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, ingestionTimeMs, null,
-          null);
+      ingestionDelayTracker.updateMetrics(segmentName, partitionId, 
ingestionTimeMs, null);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 partitionId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 partitionId);
     }
     ingestionDelayTracker.shutdown();
 
@@ -305,34 +378,154 @@ public class IngestionDelayTrackerTest {
     final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 
234).getSegmentName();
 
     IngestionDelayTracker ingestionDelayTracker = createTracker();
+    Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap = new 
HashMap<>();
+    ((MockIngestionDelayTracker) 
ingestionDelayTracker).updatePartitionIdToLatestOffset(partitionMsgOffsetMap);
 
     // Test tracking offset lag for a single partition
     StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(50);
     StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(150);
-    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
-        latestOffset0);
+    partitionMsgOffsetMap.put(partition0, latestOffset0);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, Long.MIN_VALUE, 
msgOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 100);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
 150);
+    
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition0), 
150);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
 50);
 
     // Test tracking offset lag for another partition
     StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
     StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
-    ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
-        latestOffset1);
+    partitionMsgOffsetMap.put(partition1, latestOffset1);
+    ingestionDelayTracker.updateMetrics(segment1, partition1, Long.MIN_VALUE, 
msgOffset1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
 100);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition1),
 150);
+    
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition1), 
150);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition1),
 50);
 
     // Update offset lag for partition0
     msgOffset0 = new LongMsgOffset(150);
     latestOffset0 = new LongMsgOffset(200);
-    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
-        latestOffset0);
+    partitionMsgOffsetMap.put(partition0, latestOffset0);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, Long.MIN_VALUE, 
msgOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 50);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
 200);
+    
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition0), 
200);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
 150);
 
     ingestionDelayTracker.shutdown();
   }
+
+  @Test
+  public void testUpdateLatestStreamOffset() {
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+    Set<Integer> partitionsHosted = new HashSet<>();
+    partitionsHosted.add(0);
+    partitionsHosted.add(1);
+
+    ingestionDelayTracker.updateLatestStreamOffset(partitionsHosted);
+    
Assert.assertEquals(ingestionDelayTracker._partitionIdToLatestOffset.size(), 2);
+    Assert.assertEquals(((LongMsgOffset) 
(ingestionDelayTracker._partitionIdToLatestOffset.get(0))).getOffset(),
+        Integer.MAX_VALUE);
+    Assert.assertEquals(((LongMsgOffset) 
(ingestionDelayTracker._partitionIdToLatestOffset.get(1))).getOffset(),
+        Integer.MAX_VALUE);
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    List<Map<String, String>> streamConfigMaps = new ArrayList<>();
+    streamConfigMaps.add(getStreamConfigs());
+    streamConfigMaps.add(getStreamConfigs());
+    StreamIngestionConfig streamIngestionConfig = new 
StreamIngestionConfig(streamConfigMaps);
+    ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+        .setTimeColumnName("ts")
+        .setNullHandlingEnabled(true)
+        .setIngestionConfig(ingestionConfig)
+        .setStreamConfigs(getStreamConfigs())
+        .build();
+    
when(REALTIME_TABLE_DATA_MANAGER.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
 null));
+    ingestionDelayTracker = createTracker();
+    
partitionsHosted.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(0,
 1));
+    ingestionDelayTracker.updateLatestStreamOffset(partitionsHosted);
+    
Assert.assertEquals(ingestionDelayTracker._partitionIdToLatestOffset.size(), 3);
+    Assert.assertEquals(((LongMsgOffset) 
(ingestionDelayTracker._partitionIdToLatestOffset.get(0))).getOffset(),
+        Integer.MAX_VALUE);
+    Assert.assertEquals(((LongMsgOffset) 
(ingestionDelayTracker._partitionIdToLatestOffset.get(1))).getOffset(),
+        Integer.MAX_VALUE);
+    Assert.assertEquals(((LongMsgOffset) 
(ingestionDelayTracker._partitionIdToLatestOffset.get(
+        IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(0, 
1)))).getOffset(), Integer.MAX_VALUE);
+  }
+
+  @Test
+  public void testIngestionDelay() {
+    MockIngestionDelayTracker ingestionDelayTracker =
+        new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
REALTIME_TABLE_DATA_MANAGER,
+            METRICS_CLEANUP_TICK_INTERVAL_MS, 
METRICS_TRACKING_TICK_INTERVAL_MS, () -> true);
+
+    final int partition0 = 0;
+    final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 
123).getSegmentName();
+    final int partition1 = 1;
+
+    ingestionDelayTracker._partitionsHostedByThisServer.add(partition0);
+    ingestionDelayTracker._partitionsHostedByThisServer.add(partition1);
+    ingestionDelayTracker.updateMetrics(segment0, partition0, 
System.currentTimeMillis(), new LongMsgOffset(50));
+
+    ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+    Map<Integer, StreamPartitionMsgOffset> partitionIdVsLatestOffset = new 
HashMap<>();
+    partitionIdVsLatestOffset.put(partition0, new LongMsgOffset(200));
+    partitionIdVsLatestOffset.put(partition1, new LongMsgOffset(200));
+    
ingestionDelayTracker.updatePartitionIdToLatestOffset(partitionIdVsLatestOffset);
+
+    scheduledExecutorService.scheduleWithFixedDelay(() -> {
+      partitionIdVsLatestOffset.put(partition0,
+          new LongMsgOffset(((LongMsgOffset) 
(partitionIdVsLatestOffset.get(partition0))).getOffset() + 50));
+      partitionIdVsLatestOffset.put(partition1,
+          new LongMsgOffset(((LongMsgOffset) 
(partitionIdVsLatestOffset.get(partition1))).getOffset() + 50));
+    }, 0, 10, TimeUnit.MILLISECONDS);
+
+    Map<Integer, Map<String, List<Long>>> partitionToMetricToValues = 
ingestionDelayTracker._partitionToMetricToValues;
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        verifyMetrics(partitionToMetricToValues);
+      } catch (Error e) {
+        return false;
+      }
+      return true;
+    }, 10, 2000, "Failed to verify the ingestion delay metrics.");
+    scheduledExecutorService.shutdown();
+    ingestionDelayTracker.shutdown();
+  }
+
+  private void verifyMetrics(Map<Integer, Map<String, List<Long>>> 
partitionToMetricToValues) {
+    Assert.assertEquals(partitionToMetricToValues.size(), 2);
+    verifyPartition0(partitionToMetricToValues.get(0));
+    verifyPartition1(partitionToMetricToValues.get(1));
+  }
+
+  private void verifyPartition0(Map<String, List<Long>> metrics) {
+    assertMinMax(metrics, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(), 150L, 300L);
+    assertMinMax(metrics, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(), 200L, 350L);
+    assertEqualsFirstAndLast(metrics, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(), 50L);
+    assertIncreasing(metrics, 
ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName());
+    
Assert.assertTrue(metrics.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName()).get(0)
 > 0);
+  }
+
+  private void verifyPartition1(Map<String, List<Long>> metrics) {
+    assertMinMax(metrics, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(), 200L, 350L);
+    assertMinMax(metrics, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(), 200L, 350L);
+    assertEqualsFirstAndLast(metrics, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(), 0L);
+    assertIncreasing(metrics, 
ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName());
+    
Assert.assertTrue(metrics.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName()).get(0)
 > 0);
+  }
+
+  private void assertMinMax(Map<String, List<Long>> metrics, String key, long 
minFirst, long minLast) {
+    List<Long> values = metrics.get(key);
+    Assert.assertTrue(values.get(0) >= minFirst, key + " first value too 
small");
+    Assert.assertTrue(values.get(values.size() - 1) >= minLast, key + " last 
value too small");
+  }
+
+  private void assertEqualsFirstAndLast(Map<String, List<Long>> metrics, 
String key, long expected) {
+    List<Long> values = metrics.get(key);
+    Assert.assertEquals(values.get(0), expected, key + " first value 
mismatch");
+    Assert.assertEquals(values.get(values.size() - 1), expected, key + " last 
value mismatch");
+  }
+
+  private void assertIncreasing(Map<String, List<Long>> metrics, String key) {
+    List<Long> values = metrics.get(key);
+    Assert.assertTrue(values.get(values.size() - 1) > values.get(0), key + " 
not increasing");
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index c59c15a028a..99e3d90749d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,9 +19,12 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -57,8 +60,23 @@ public class FakeStreamMetadataProvider implements 
StreamMetadataProvider {
     }
   }
 
+  @Override
+  public boolean supportsOffsetLag() {
+    return true;
+  }
+
   @Override
   public void close()
       throws IOException {
   }
+
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds,
+      long timeoutMillis) {
+    Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset = new 
HashMap<>();
+    for (Integer partitionId: partitionIds) {
+      partitionIdToLatestOffset.put(partitionId, new 
LongMsgOffset(Integer.MAX_VALUE));
+    }
+    return partitionIdToLatestOffset;
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c3fc185de09..694ca8972ac 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -95,6 +96,28 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    List<TopicPartition> topicPartitions = new 
ArrayList<>(partitionIds.size());
+    for (Integer streamPartition: partitionIds) {
+      topicPartitions.add(new TopicPartition(_topic, streamPartition));
+    }
+    try {
+      Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+          _consumer.endOffsets(topicPartitions, 
Duration.ofMillis(timeoutMillis));
+
+      Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+          new HashMap<>(topicPartitionToLatestOffsetMap.size());
+      for (Map.Entry<TopicPartition, Long> entry : 
topicPartitionToLatestOffsetMap.entrySet()) {
+        partitionIdToLatestOffset.put(entry.getKey().partition(), new 
LongMsgOffset(entry.getValue()));
+      }
+
+      return partitionIdToLatestOffset;
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
@@ -188,6 +211,11 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public boolean supportsOffsetLag() {
+    return true;
+  }
+
   @Override
   public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis, long timeoutMillis) {
     return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, 
timestampMillis),
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6e4de8a1a03..664885f4082 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -424,6 +427,22 @@ public class KafkaPartitionLevelConsumerTest {
     assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, 
TEST_TOPIC_3)));
   }
 
+  @Test
+  public void testFetchLatestStreamOffset()
+      throws IOException {
+    StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+    try (KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider("clientId",
+        streamConfig)) {
+      Set<Integer> partitions = new HashSet<>();
+      partitions.add(0);
+      partitions.add(1);
+      Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+          streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+    }
+  }
+
   @Test
   void testBatchSizeInBytesIsCalculatedCorrectly() {
     TopicPartition topicPartition = new TopicPartition("test-topic", 0);
@@ -457,20 +476,19 @@ public class KafkaPartitionLevelConsumerTest {
     }
 
     FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
-        new FakeKafkaPartitionLevelConsumer("clientId-test", 
getStreamConfig(), 0);
+        new FakeKafkaPartitionLevelConsumer("clientId-test", 
getStreamConfig("test-topic"), 0);
     KafkaMessageBatch kafkaMessageBatch = 
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
     Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
   }
 
-  private StreamConfig getStreamConfig() {
+  private StreamConfig getStreamConfig(String topicName) {
     String streamType = "kafka";
-    String streamKafkaTopicName = "test-topic";
     String streamKafkaBrokerList = _kafkaBrokerAddress;
     String tableNameWithType = "tableName_REALTIME";
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put("streamType", streamType);
-    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.topic.name", topicName);
     streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
     streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
     streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index a0d58a244ac..36523c54e7a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -95,6 +96,28 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+    List<TopicPartition> topicPartitions = new 
ArrayList<>(partitionIds.size());
+    for (Integer streamPartition: partitionIds) {
+      topicPartitions.add(new TopicPartition(_topic, streamPartition));
+    }
+    try {
+      Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+          _consumer.endOffsets(topicPartitions, 
Duration.ofMillis(timeoutMillis));
+
+      Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+          new HashMap<>(topicPartitionToLatestOffsetMap.size());
+      for (Map.Entry<TopicPartition, Long> entry : 
topicPartitionToLatestOffsetMap.entrySet()) {
+        partitionIdToLatestOffset.put(entry.getKey().partition(), new 
LongMsgOffset(entry.getValue()));
+      }
+
+      return partitionIdToLatestOffset;
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
@@ -188,6 +211,11 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public boolean supportsOffsetLag() {
+    return true;
+  }
+
   public static class KafkaTopicMetadata implements TopicMetadata {
     private String _name;
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index 4a744114398..e9ed3b7df8a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.plugin.stream.kafka30;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -457,20 +460,35 @@ public class KafkaPartitionLevelConsumerTest {
     }
 
     FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
-        new FakeKafkaPartitionLevelConsumer("clientId-test", 
getStreamConfig(), 0);
+        new FakeKafkaPartitionLevelConsumer("clientId-test", 
getStreamConfig("test-topic"), 0);
     KafkaMessageBatch kafkaMessageBatch = 
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
     Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
   }
 
-  private StreamConfig getStreamConfig() {
+  @Test
+  public void testFetchLatestStreamOffset()
+      throws IOException {
+    StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+    try (KafkaStreamMetadataProvider streamMetadataProvider = new 
KafkaStreamMetadataProvider("clientId",
+        streamConfig)) {
+      Set<Integer> partitions = new HashSet<>();
+      partitions.add(0);
+      partitions.add(1);
+      Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+          streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+      Assert.assertEquals(((LongMsgOffset) 
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+    }
+  }
+
+  private StreamConfig getStreamConfig(String topicName) {
     String streamType = "kafka";
-    String streamKafkaTopicName = "test-topic";
     String streamKafkaBrokerList = _kafkaBrokerAddress;
     String tableNameWithType = "tableName_REALTIME";
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put("streamType", streamType);
-    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.topic.name", topicName);
     streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
     streamConfigMap.put("stream.kafka.consumer.factory.class.name", 
getKafkaConsumerFactoryName());
     streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index fb24821910a..baa6eeaa610 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -287,6 +287,11 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public boolean supportsOffsetLag() {
+    return false;
+  }
+
   public static class KinesisTopicMetadata implements TopicMetadata {
     private String _name;
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
index 5384ec71b1a..657921613bd 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
@@ -202,6 +202,11 @@ public class PulsarStreamMetadataProvider extends 
PulsarPartitionLevelConnection
     }
   }
 
+  @Override
+  public boolean supportsOffsetLag() {
+    return false;
+  }
+
   public static class PulsarTopicMetadata implements TopicMetadata {
     private String _name;
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 4f95198c033..0ef42b8dde1 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -53,6 +53,17 @@ public interface StreamMetadataProvider extends Closeable {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Fetches the latest offset for a set of given partition Ids.
+   * @param partitionIds partition Ids of the stream
+   * @param timeoutMillis fetch timeout
+   * @return latest {@link StreamPartitionMsgOffset} for each partition Id.
+   */
+  default Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitionIds,
+      long timeoutMillis) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Fetches the offset for a given partition and offset criteria
    * @param offsetCriteria offset criteria to fetch{@link 
StreamPartitionMsgOffset}.
@@ -152,6 +163,12 @@ public interface StreamMetadataProvider extends Closeable {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * @return true if the stream supports computing ingestion lag by 
subtracting the last consumed offset from the
+   * latest offset.
+   */
+  boolean supportsOffsetLag();
+
   /**
    * Represents the metadata of a topic. This can be used to represent the 
topic name and other metadata in the future.
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index a3841dde9a5..a5dfab2f5d2 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -20,8 +20,10 @@ package org.apache.pinot.spi.utils;
 
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -34,6 +36,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
 
 
 /**
@@ -313,4 +316,25 @@ public final class IngestionConfigUtils {
     }
     return DEFAULT_PUSH_RETRY_INTERVAL_MILLIS;
   }
+
+  /**
+   * Returns a unique client id which can be used for Stream providers
+   */
+  public static String getTableTopicUniqueClientId(String className, 
StreamConfig streamConfig) {
+    return StreamConsumerFactory.getUniqueClientId(
+        className + "-" + streamConfig.getTableNameWithType() + "-" + 
streamConfig.getTopicName());
+  }
+
+  /**
+   * Returns a Map of stream config index to Set of stream partition Ids.
+   * @param pinotPartitionIds Set of pinot partition ids.
+   */
+  public static Map<Integer, Set<Integer>> 
getStreamConfigIndexToStreamPartitions(Set<Integer> pinotPartitionIds) {
+    Map<Integer, Set<Integer>> streamIndexToPartitions = new HashMap<>();
+    for (Integer partition : pinotPartitionIds) {
+      
streamIndexToPartitions.computeIfAbsent(getStreamConfigIndexFromPinotPartitionId(partition),
+          k -> new 
HashSet<>()).add(getStreamPartitionIdFromPinotPartitionId(partition));
+    }
+    return streamIndexToPartitions;
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index 59aec8b3b91..e653ee4b9b1 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -158,4 +160,27 @@ public class IngestionConfigUtilsTest {
     Assert.assertEquals(2, 
IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1").size());
     Assert.assertEquals(2, 
IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1.").size());
   }
+
+  @Test
+  public void testGetStreamConfigIndexToStreamPartitions() {
+    Set<Integer> pinotPartitionIds = new HashSet<>(2);
+    pinotPartitionIds.add(0);
+    pinotPartitionIds.add(1);
+    Map<Integer, Set<Integer>> streamConfigIndexToStreamPartitions =
+        
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(pinotPartitionIds);
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.size(), 1);
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.get(0), new 
HashSet<>(Arrays.asList(0, 1)));
+
+    pinotPartitionIds = new HashSet<>(4);
+    pinotPartitionIds.add(2);
+    
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(100,
 1));
+    
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(1,
 1));
+    
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(400,
 3));
+    streamConfigIndexToStreamPartitions =
+        
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(pinotPartitionIds);
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.size(), 3);
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.get(0), new 
HashSet<>(Arrays.asList(2)));
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new 
HashSet<>(Arrays.asList(100, 1)));
+    Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new 
HashSet<>(Arrays.asList(400)));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to