This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d2c3f54a6 Remove ingestion metrics when consuming segment relocates 
(#13668)
4d2c3f54a6 is described below

commit 4d2c3f54a64d4607c1d663994327c81896bd80f1
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Aug 13 16:33:53 2024 -0700

    Remove ingestion metrics when consuming segment relocates (#13668)
---
 .../pinot/common/messages/ForceCommitMessage.java  |   2 +-
 .../messages/IngestionMetricsRemoveMessage.java    |  47 ++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  59 ++++-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   3 +-
 .../manager/realtime/IngestionDelayTracker.java    | 290 ++++++++++-----------
 .../realtime/RealtimeSegmentDataManager.java       |  25 +-
 .../manager/realtime/RealtimeTableDataManager.java |  71 ++---
 .../realtime/IngestionDelayTrackerTest.java        | 201 +++++++-------
 .../helix/SegmentMessageHandlerFactory.java        |  26 ++
 9 files changed, 428 insertions(+), 296 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
index 8d9d1b02bd..4535789f42 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
@@ -49,7 +49,7 @@ public class ForceCommitMessage extends Message {
     super(message.getRecord());
     String msgSubType = message.getMsgSubType();
     Preconditions.checkArgument(msgSubType.equals(FORCE_COMMIT_MSG_SUB_TYPE),
-        "Invalid message sub type: " + msgSubType + " for 
SegmentReloadMessage");
+        "Invalid message sub type: " + msgSubType + " for ForceCommitMessage");
   }
 
   public String getTableName() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
new file mode 100644
index 0000000000..3c2100a46a
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Ingestion metrics remove message is created on controller and get sent to 
servers to instruct them to remove
+ * ingestion metrics for the stream partition of the given segment when the 
new consuming segment is no longer served by
+ * the server.
+ */
+public class IngestionMetricsRemoveMessage extends Message {
+  public static final String INGESTION_METRICS_REMOVE_MSG_SUB_TYPE = 
"INGESTION_METRICS_REMOVE";
+
+  public IngestionMetricsRemoveMessage() {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is 
alive
+    setExecutionTimeout(-1);
+  }
+
+  public IngestionMetricsRemoveMessage(Message message) {
+    super(message.getRecord());
+    String msgSubType = message.getMsgSubType();
+    
Preconditions.checkArgument(msgSubType.equals(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE),
+        "Invalid message sub type: " + msgSubType + " for 
IngestionMetricsRemoveMessage");
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c471277590..bb331f2bc4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -40,6 +41,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -50,6 +52,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.messages.ForceCommitMessage;
+import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -576,8 +579,9 @@ public class PinotLLCRealtimeSegmentManager {
     // to reduce this contention. We may still contend with RetentionManager, 
or other updates
     // to idealstate from other controllers, but then we have the retry 
mechanism to get around that.
     synchronized 
(_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
-      updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentName,
-          segmentAssignment, instancePartitionsMap);
+      idealState =
+          updateIdealStateOnSegmentCompletion(realtimeTableName, 
committingSegmentName, newConsumingSegmentName,
+              segmentAssignment, instancePartitionsMap);
     }
 
     long endTimeNs = System.nanoTime();
@@ -598,6 +602,12 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Trigger the metadata event notifier
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
+
+    // Handle segment movement if necessary
+    if (newConsumingSegmentName != null) {
+      handleSegmentMovement(realtimeTableName, 
idealState.getRecord().getMapFields(), committingSegmentName,
+          newConsumingSegmentName);
+    }
   }
 
   /**
@@ -937,10 +947,10 @@ public class PinotLLCRealtimeSegmentManager {
    * Updates ideal state after completion of a realtime segment
    */
   @VisibleForTesting
-  void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
+  IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, 
String committingSegmentName,
       String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState 
-> {
+    return HelixHelper.updateIdealState(_helixManager, realtimeTableName, 
idealState -> {
       assert idealState != null;
       // When segment completion begins, the zk metadata is updated, followed 
by ideal state.
       // We allow only {@link 
PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a 
segment to
@@ -1014,6 +1024,47 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  /**
+   * Handles segment movement between instances.
+   * If the new consuming segment is served by a different set of servers than 
the committed segment, notify the
+   * servers no longer serving the stream partition to remove the ingestion 
metrics. This can prevent servers from
+   * emitting high ingestion delay alerts on stream partitions no longer 
served.
+   */
+  private void handleSegmentMovement(String realtimeTableName, Map<String, 
Map<String, String>> instanceStatesMap,
+      String committedSegment, String newConsumingSegment) {
+    Set<String> oldInstances = 
instanceStatesMap.get(committedSegment).keySet();
+    Set<String> newInstances = 
instanceStatesMap.get(newConsumingSegment).keySet();
+    if (newInstances.containsAll(oldInstances)) {
+      return;
+    }
+    Set<String> instancesNoLongerServe = new HashSet<>(oldInstances);
+    instancesNoLongerServe.removeAll(newInstances);
+    LOGGER.info("Segment movement detected for committed segment: {} (served 
by: {}), "
+            + "consuming segment: {} (served by: {}) in table: {}, "
+            + "sending message to instances: {} to remove ingestion metrics", 
committedSegment, oldInstances,
+        newConsumingSegment, newInstances, realtimeTableName, 
instancesNoLongerServe);
+
+    ClusterMessagingService messagingService = 
_helixManager.getMessagingService();
+    List<String> instancesSent = new 
ArrayList<>(instancesNoLongerServe.size());
+    for (String instance : instancesNoLongerServe) {
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName(instance);
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource(realtimeTableName);
+      recipientCriteria.setPartition(committedSegment);
+      recipientCriteria.setSessionSpecific(true);
+      IngestionMetricsRemoveMessage message = new 
IngestionMetricsRemoveMessage();
+      if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
+        instancesSent.add(instance);
+      } else {
+        LOGGER.warn("Failed to send ingestion metrics remove message for 
table: {} segment: {} to instance: {}",
+            realtimeTableName, committedSegment, instance);
+      }
+    }
+    LOGGER.info("Sent ingestion metrics remove message for table: {} segment: 
{} to instances: {}", realtimeTableName,
+        committedSegment, instancesSent);
+  }
+
   /*
    *  A segment commit takes 3 modifications to zookeeper:
    *  - Change old segment metadata (mark it DONE, and then other things)
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 5af7fb92de..16d1983a21 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1212,13 +1212,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    void updateIdealStateOnSegmentCompletion(String realtimeTableName, String 
committingSegmentName,
+    IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, 
String committingSegmentName,
         String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) 
{
       
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 committingSegmentName, null,
           segmentAssignment, instancePartitionsMap);
       
updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(),
 null, newSegmentName,
           segmentAssignment, instancePartitionsMap);
+      return _idealState;
     }
 
     @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6953ddaf33..fd31d8f72b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,9 +33,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -81,43 +86,46 @@ import org.slf4j.LoggerFactory;
 
 public class IngestionDelayTracker {
 
-  // Class to wrap supported timestamps collected for an ingested event
-  private static class IngestionTimestamps {
-    private final long _firstStreamIngestionTimeMs;
-    private final long _ingestionTimeMs;
-    IngestionTimestamps(long ingestionTimesMs, long 
firstStreamIngestionTimeMs) {
-      _ingestionTimeMs = ingestionTimesMs;
-      _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
-    }
-  }
+  private static class IngestionInfo {
+    final long _ingestionTimeMs;
+    final long _firstStreamIngestionTimeMs;
+    final StreamPartitionMsgOffset _currentOffset;
+    final StreamPartitionMsgOffset _latestOffset;
 
-  private static class IngestionOffsets {
-    private final StreamPartitionMsgOffset _latestOffset;
-    private final StreamPartitionMsgOffset _offset;
-    IngestionOffsets(StreamPartitionMsgOffset offset, StreamPartitionMsgOffset 
latestOffset) {
-      _offset = offset;
+    IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
+        @Nullable StreamPartitionMsgOffset currentOffset, @Nullable 
StreamPartitionMsgOffset latestOffset) {
+      _ingestionTimeMs = ingestionTimeMs;
+      _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
+      _currentOffset = currentOffset;
       _latestOffset = latestOffset;
     }
   }
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionDelayTracker.class);
+
   // Sleep interval for scheduled executor service thread that triggers read 
of ideal state
   private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 
300000; // 5 minutes +/- precision in timeouts
   // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
   private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
   // Delay scheduled executor service for this amount of time after starting 
service
   private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
-  private static final Logger _logger = 
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
 
-  // HashMap used to store ingestion time measures for all partitions active 
for the current table.
-  private final Map<Integer, IngestionTimestamps> 
_partitionToIngestionTimestampsMap = new ConcurrentHashMap<>();
+  // Cache expire time for ignored segment if there is no update from the 
segment.
+  private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
+
+  // Per partition info for all partitions active for the current table.
+  private final Map<Integer, IngestionInfo> _ingestionInfoMap = new 
ConcurrentHashMap<>();
 
-  private final Map<Integer, IngestionOffsets> _partitionToOffsetMap = new 
ConcurrentHashMap<>();
   // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
   // go back to CONSUMING in some period of time, we verify whether they are 
still hosted in this server by reading
   // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  // TODO: Consider removing this mechanism after releasing 1.2.0, and use 
{@link #stopTrackingPartitionIngestionDelay}
+  //       instead.
   private final Map<Integer, Long> _partitionsMarkedForVerification = new 
ConcurrentHashMap<>();
 
-  final int _scheduledExecutorThreadTickIntervalMs;
+  private final Cache<String, Boolean> _segmentsToIgnore =
+      
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, 
TimeUnit.MINUTES).build();
+
   // TODO: Make thread pool a server/cluster level config
   // ScheduledExecutorService to check partitions that are inactive against 
ideal state.
   private final ScheduledExecutorService _scheduledExecutor = 
Executors.newScheduledThreadPool(2);
@@ -131,6 +139,7 @@ public class IngestionDelayTracker {
 
   private Clock _clock;
 
+  @VisibleForTesting
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
       RealtimeTableDataManager realtimeTableDataManager, int 
scheduledExecutorThreadTickIntervalMs,
       Supplier<Boolean> isServerReadyToServeQueries)
@@ -144,9 +153,8 @@ public class IngestionDelayTracker {
     // Handle negative timer values
     if (scheduledExecutorThreadTickIntervalMs <= 0) {
       throw new RuntimeException(String.format("Illegal timer timeout 
argument, expected > 0, got=%d for table=%s",
-              scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
+          scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
     }
-    _scheduledExecutorThreadTickIntervalMs = 
scheduledExecutorThreadTickIntervalMs;
 
     // ThreadFactory to set the thread's name
     ThreadFactory threadFactory = new ThreadFactory() {
@@ -162,7 +170,7 @@ public class IngestionDelayTracker {
     ((ScheduledThreadPoolExecutor) 
_scheduledExecutor).setThreadFactory(threadFactory);
 
     _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
-            INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, 
_scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
+        INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, 
scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
   }
 
   public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
@@ -188,41 +196,26 @@ public class IngestionDelayTracker {
     return agedIngestionDelayMs;
   }
 
-  private long getPartitionOffsetLag(IngestionOffsets offset) {
-    if (offset == null) {
-      return 0;
-    }
-    StreamPartitionMsgOffset currentOffset = offset._offset;
-    StreamPartitionMsgOffset latestOffset = offset._latestOffset;
-
-    if (currentOffset == null || latestOffset == null) {
-      return 0;
-    }
-
-    // Compute aged delay for current partition
-    // TODO: Support other types of offsets
-    if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
-      return 0;
-    }
-
-    return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
-  }
-
   /*
    * Helper function to be called when we should stop tracking a given 
partition. Removes the partition from
    * all our maps.
    *
-   * @param partitionGroupId partition ID which we should stop tracking.
+   * @param partitionId partition ID which we should stop tracking.
    */
-  private void removePartitionId(int partitionGroupId) {
-    _partitionToIngestionTimestampsMap.remove(partitionGroupId);
-    _partitionToOffsetMap.remove(partitionGroupId);
+  private void removePartitionId(int partitionId) {
+    _ingestionInfoMap.compute(partitionId, (k, v) -> {
+      if (v != null) {
+        // Remove all metrics associated with this partition
+        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
+        _serverMetrics.removePartitionGauge(_metricName, partitionId,
+            ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
+        _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+      }
+      return null;
+    });
+
     // If we are removing a partition we should stop reading its ideal state.
-    _partitionsMarkedForVerification.remove(partitionGroupId);
-    _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
-    _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
-        ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
-    _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+    _partitionsMarkedForVerification.remove(partitionId);
   }
 
   /*
@@ -241,7 +234,6 @@ public class IngestionDelayTracker {
     return partitionsToVerify;
   }
 
-
   /**
    * Function that enable use to set predictable clocks for testing purposes.
    *
@@ -255,80 +247,74 @@ public class IngestionDelayTracker {
   /**
    * Called by RealTimeSegmentDataManagers to update the ingestion delay 
metrics for a given partition.
    *
-   * @param ingestionTimeMs             ingestion time being recorded.
-   * @param firstStreamIngestionTimeMs  time the event was ingested in the 
first stage of the ingestion pipeline.
-   * @param msgOffset                   message offset of the event being 
ingested.
-   * @param latestOffset                latest message offset in the stream.
-   * @param partitionGroupId            partition ID for which the ingestion 
metrics are being recorded.
+   * @param segmentName name of the consuming segment
+   * @param partitionId partition id of the consuming segment (directly passed 
in to avoid parsing the segment name)
+   * @param ingestionTimeMs ingestion time of the last consumed message (from 
{@link RowMetadata})
+   * @param firstStreamIngestionTimeMs ingestion time of the last consumed 
message in the first stream (from
+   *                                   {@link RowMetadata})
+   * @param currentOffset offset of the last consumed message (from {@link 
RowMetadata})
+   * @param latestOffset offset of the latest message in the partition (from 
{@link StreamMetadataProvider})
    */
-  public void updateIngestionMetrics(long ingestionTimeMs, long 
firstStreamIngestionTimeMs,
-      StreamPartitionMsgOffset msgOffset, StreamPartitionMsgOffset 
latestOffset,
-      int partitionGroupId) {
+  public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
+      long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
+      @Nullable StreamPartitionMsgOffset latestOffset) {
     if (!_isServerReadyToServeQueries.get() || 
_realTimeTableDataManager.isShutDown()) {
       // Do not update the ingestion delay metrics during server startup period
       // or once the table data manager has been shutdown.
       return;
     }
 
-    updateIngestionDelay(ingestionTimeMs, firstStreamIngestionTimeMs, 
partitionGroupId);
-    updateIngestionOffsets(msgOffset, latestOffset, partitionGroupId);
-
-    // If we are consuming we do not need to track this partition for removal.
-    _partitionsMarkedForVerification.remove(partitionGroupId);
-  }
-
-  public void updateIngestionDelay(long ingestionTimeMs, long 
firstStreamIngestionTimeMs, int partitionGroupId) {
-    if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
-      // If stream does not return a valid ingestion timestamps don't publish 
a metric
+    if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && 
(currentOffset == null || latestOffset == null)) {
+      // Do not publish metrics if stream does not return valid ingestion time 
or offset.
       return;
     }
-    IngestionTimestamps previousMeasure = 
_partitionToIngestionTimestampsMap.put(partitionGroupId,
-        new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
-    if (previousMeasure == null) {
-      // First time we start tracking a partition we should start tracking it 
via metric
-      // Only publish the metric if supported by the underlying stream. If not 
supported the stream
-      // returns Long.MIN_VALUE
-      if (ingestionTimeMs >= 0) {
-        _serverMetrics.setOrUpdatePartitionGauge(_metricName, 
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
-            () -> getPartitionIngestionDelayMs(partitionGroupId));
+
+    _ingestionInfoMap.compute(partitionId, (k, v) -> {
+      if (_segmentsToIgnore.getIfPresent(segmentName) != null) {
+        // Do not update the metrics for the segment that is marked to be 
ignored.
+        return v;
       }
-      if (firstStreamIngestionTimeMs >= 0) {
-        // Only publish this metric when creation time is supported by the 
underlying stream
-        // When this timestamp is not supported it always returns the value 
Long.MIN_VALUE
-        _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
-            ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
-            () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
+      if (v == null) {
+        // Add metric when we start tracking a partition. Only publish the 
metric if supported by the stream.
+        if (ingestionTimeMs > 0) {
+          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS,
+              () -> getPartitionIngestionDelayMs(partitionId));
+        }
+        if (firstStreamIngestionTimeMs > 0) {
+          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+              ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
+              () -> getPartitionEndToEndIngestionDelayMs(partitionId));
+        }
+        if (currentOffset != null && latestOffset != null) {
+          _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
+              () -> getPartitionIngestionOffsetLag(partitionId));
+        }
       }
-    }
-  }
+      return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, 
currentOffset, latestOffset);
+    });
 
-  public void updateIngestionOffsets(StreamPartitionMsgOffset currentOffset, 
StreamPartitionMsgOffset latestOffset,
-      int partitionGroupId) {
-    if ((currentOffset == null)) {
-      // If stream does not return a valid ingestion offset don't publish a 
metric
-      return;
-    }
-    IngestionOffsets previousMeasure =
-        _partitionToOffsetMap.put(partitionGroupId, new 
IngestionOffsets(currentOffset, latestOffset));
-    if (previousMeasure == null) {
-      // First time we start tracking a partition we should start tracking it 
via metric
-      // Only publish the metric if supported by the underlying stream. If not 
supported the stream
-      // returns Long.MIN_VALUE
-      if (currentOffset != null) {
-        _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
-            ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> 
getPartitionIngestionOffsetLag(partitionGroupId));
-      }
-    }
+    // If we are consuming we do not need to track this partition for removal.
+    _partitionsMarkedForVerification.remove(partitionId);
   }
 
   /*
    * Handle partition removal event. This must be invoked when we stop serving 
a given partition for
    * this table in the current server.
    *
-   * @param partitionGroupId partition id that we should stop tracking.
+   * @param partitionId partition id that we should stop tracking.
    */
-  public void stopTrackingPartitionIngestionDelay(int partitionGroupId) {
-    removePartitionId(partitionGroupId);
+  public void stopTrackingPartitionIngestionDelay(int partitionId) {
+    removePartitionId(partitionId);
+  }
+
+  /**
+   * Stops tracking the partition ingestion delay, and also ignores the 
updates from the given segment. This is useful
+   * when we want to stop tracking the ingestion delay for a partition when 
the segment might still be consuming, e.g.
+   * when the new consuming segment is created on a different server.
+   */
+  public void stopTrackingPartitionIngestionDelay(String segmentName) {
+    _segmentsToIgnore.put(segmentName, true);
+    removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
   }
 
   /*
@@ -345,7 +331,7 @@ public class IngestionDelayTracker {
     // Check if we have any partition to verify, else don't make the call to 
check ideal state as that
     // involves network traffic and may be inefficient.
     List<Integer> partitionsToVerify = getPartitionsToBeVerified();
-    if (partitionsToVerify.size() == 0) {
+    if (partitionsToVerify.isEmpty()) {
       // Don't make the call to getHostedPartitionsGroupIds() as it involves 
checking ideal state.
       return;
     }
@@ -353,87 +339,81 @@ public class IngestionDelayTracker {
     try {
       partitionsHostedByThisServer = 
_realTimeTableDataManager.getHostedPartitionsGroupIds();
     } catch (Exception e) {
-      _logger.error("Failed to get partitions hosted by this server, table={}, 
exception={}:{}", _tableNameWithType,
+      LOGGER.error("Failed to get partitions hosted by this server, table={}, 
exception={}:{}", _tableNameWithType,
           e.getClass(), e.getMessage());
       return;
     }
-    for (int partitionGroupId : partitionsToVerify) {
-      if (!partitionsHostedByThisServer.contains(partitionGroupId)) {
+    for (int partitionId : partitionsToVerify) {
+      if (!partitionsHostedByThisServer.contains(partitionId)) {
         // Partition is not hosted in this server anymore, stop tracking it
-        removePartitionId(partitionGroupId);
+        removePartitionId(partitionId);
       }
     }
   }
 
-  /*
-   * This function is invoked when a partition goes from CONSUMING to ONLINE, 
so we can assert whether the
-   * partition is still hosted by this server after some interval of time.
-   *
-   * @param partitionGroupId Partition id that we need confirmed via ideal 
state as still hosted by this server.
+  /**
+   * This function is invoked when a segment goes from CONSUMING to ONLINE, so 
we can assert whether the partition of
+   * the segment is still hosted by this server after some interval of time.
    */
-  public void markPartitionForVerification(int partitionGroupId) {
-    if (!_isServerReadyToServeQueries.get()) {
-      // Do not update the tracker state during server startup period
+  public void markPartitionForVerification(String segmentName) {
+    if (!_isServerReadyToServeQueries.get() || 
_segmentsToIgnore.getIfPresent(segmentName) != null) {
+      // Do not update the tracker state during server startup period or if 
the segment is marked to be ignored
       return;
     }
-    _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
+    _partitionsMarkedForVerification.put(new 
LLCSegmentName(segmentName).getPartitionGroupId(), _clock.millis());
   }
 
   /*
    * Method to get timestamp used for the ingestion delay for a given 
partition.
    *
-   * @param partitionGroupId partition for which we are retrieving the delay
+   * @param partitionId partition for which we are retrieving the delay
    *
    * @return ingestion delay timestamp in milliseconds for the given partition 
ID.
    */
-  public long getPartitionIngestionTimeMs(int partitionGroupId) {
-    // Not protected as this will only be invoked when metric is installed 
which happens after server ready
-    IngestionTimestamps currentMeasure = 
_partitionToIngestionTimestampsMap.get(partitionGroupId);
-    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
-      return Long.MIN_VALUE;
-    }
-    return currentMeasure._ingestionTimeMs;
+  public long getPartitionIngestionTimeMs(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    return ingestionInfo != null ? ingestionInfo._ingestionTimeMs : 
Long.MIN_VALUE;
   }
 
   /*
    * Method to get ingestion delay for a given partition.
    *
-   * @param partitionGroupId partition for which we are retrieving the delay
+   * @param partitionId partition for which we are retrieving the delay
    *
    * @return ingestion delay in milliseconds for the given partition ID.
    */
-  public long getPartitionIngestionDelayMs(int partitionGroupId) {
-    // Not protected as this will only be invoked when metric is installed 
which happens after server ready
-    IngestionTimestamps currentMeasure = 
_partitionToIngestionTimestampsMap.get(partitionGroupId);
-    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
-      return 0;
-    }
-    return getIngestionDelayMs(currentMeasure._ingestionTimeMs);
-  }
-
-  public long getPartitionIngestionOffsetLag(int partitionGroupId) {
-    // Not protected as this will only be invoked when metric is installed 
which happens after server ready
-    IngestionOffsets currentMeasure = 
_partitionToOffsetMap.get(partitionGroupId);
-    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
-      return 0;
-    }
-    return getPartitionOffsetLag(currentMeasure);
+  public long getPartitionIngestionDelayMs(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
   }
 
   /*
    * Method to get end to end ingestion delay for a given partition.
    *
-   * @param partitionGroupId partition for which we are retrieving the delay
+   * @param partitionId partition for which we are retrieving the delay
    *
    * @return End to end ingestion delay in milliseconds for the given 
partition ID.
    */
-  public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
-    // Not protected as this will only be invoked when metric is installed 
which happens after server ready
-    IngestionTimestamps currentMeasure = 
_partitionToIngestionTimestampsMap.get(partitionGroupId);
-    if (currentMeasure == null) { // Guard just in case we read the metric 
without initializing it
+  public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    return ingestionInfo != null ? 
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
+  }
+
+  public long getPartitionIngestionOffsetLag(int partitionId) {
+    IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+    if (ingestionInfo == null) {
       return 0;
     }
-    return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs);
+    StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+    StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+    if (currentOffset == null || latestOffset == null) {
+      return 0;
+    }
+    // TODO: Support other types of offsets
+    if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof 
LongMsgOffset)) {
+      return 0;
+    }
+    return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) 
currentOffset).getOffset();
   }
 
   /*
@@ -448,8 +428,8 @@ public class IngestionDelayTracker {
       return;
     }
     // Remove partitions so their related metrics get uninstalled.
-    for (Map.Entry<Integer, IngestionTimestamps> entry : 
_partitionToIngestionTimestampsMap.entrySet()) {
-      removePartitionId(entry.getKey());
+    for (Integer partitionId : _ingestionInfoMap.keySet()) {
+      removePartitionId(partitionId);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 5acd5d57ee..648bed6080 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -964,6 +964,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _currentOffset;
   }
 
+  @Nullable
   public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
     return _latestStreamOffsetAtStartupTime;
   }
@@ -1683,22 +1684,27 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return _idleTimer.getTimeSinceEventLastConsumedMs();
   }
 
+  @Nullable
   public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs, 
boolean useDebugLog) {
     return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 
maxWaitTimeMs, useDebugLog);
   }
 
+  @Nullable
   public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
     return fetchLatestStreamOffset(maxWaitTimeMs, false);
   }
 
+  @Nullable
   public StreamPartitionMsgOffset fetchEarliestStreamOffset(long 
maxWaitTimeMs, boolean useDebugLog) {
     return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 
maxWaitTimeMs, useDebugLog);
   }
 
+  @Nullable
   public StreamPartitionMsgOffset fetchEarliestStreamOffset(long 
maxWaitTimeMs) {
     return fetchEarliestStreamOffset(maxWaitTimeMs, false);
   }
 
+  @Nullable
   private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria 
offsetCriteria, long maxWaitTimeMs,
       boolean useDebugLog) {
     if (_partitionMetadataProvider == null) {
@@ -1797,9 +1803,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
    * Assumes there is a valid instance of {@link PartitionGroupConsumer}
    */
   private void recreateStreamConsumer(String reason) {
-      _segmentLogger.info("Recreating stream consumer for topic partition {}, 
reason: {}", _clientId, reason);
-      _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
-      closePartitionGroupConsumer();
+    _segmentLogger.info("Recreating stream consumer for topic partition {}, 
reason: {}", _clientId, reason);
+    _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
+    closePartitionGroupConsumer();
     try {
       _partitionGroupConsumer =
           _streamConsumerFactory.createPartitionGroupConsumer(_clientId, 
_partitionGroupConsumptionStatus);
@@ -1825,20 +1831,23 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     if (metadata != null) {
       try {
         StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, 
true);
-        
_realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(),
-            metadata.getFirstStreamRecordIngestionTimeMs(), 
metadata.getOffset(), latestOffset, _partitionGroupId);
+        _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId,
+            metadata.getRecordIngestionTimeMs(), 
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
+            latestOffset);
       } catch (Exception e) {
         _segmentLogger.warn("Failed to fetch latest offset for updating 
ingestion delay", e);
       }
     }
   }
 
-  /*
+  /**
    * Sets ingestion delay to zero in situations where we are caught up 
processing events.
+   * TODO: Revisit if we should preserve the offset info.
    */
   private void setIngestionDelayToZero() {
     long currentTimeMs = System.currentTimeMillis();
-    _realtimeTableDataManager.updateIngestionMetrics(currentTimeMs, 
currentTimeMs, null, null, _partitionGroupId);
+    _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, 
_partitionGroupId, currentTimeMs, currentTimeMs,
+        null, null);
   }
 
   // This should be done during commit? We may not always commit when we build 
a segment....
@@ -1878,14 +1887,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   /**
    * Creates a {@link StreamMessageDecoder} using properties in {@link 
StreamConfig}.
    *
-   * @param streamConfig The stream config from the table config
    * @param fieldsToRead The fields to read from the source stream
    * @return The initialized StreamMessageDecoder
    */
   private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) {
     String decoderClass = _streamConfig.getDecoderClass();
     try {
-      Map<String, String> decoderProperties = 
_streamConfig.getDecoderProperties();
       StreamMessageDecoder decoder = 
PluginManager.get().createInstance(decoderClass);
       decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
       return decoder;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 75e8a4c235..8393da3884 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -74,6 +75,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -262,57 +265,63 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
-  /*
-   * Method used by RealtimeSegmentManagers to update their partition delays
+  /**
+   * Updates the ingestion metrics for the given partition.
    *
-   * @param ingestionTimeMs Ingestion delay being reported.
-   * @param firstStreamIngestionTimeMs Ingestion time of the first message in 
the stream.
-   * @param partitionGroupId Partition ID for which delay is being updated.
-   * @param offset last offset received for the partition.
-   * @param latestOffset latest upstream offset for the partition.
+   * @param segmentName name of the consuming segment
+   * @param partitionId partition id of the consuming segment (directly passed 
in to avoid parsing the segment name)
+   * @param ingestionTimeMs ingestion time of the last consumed message (from 
{@link RowMetadata})
+   * @param firstStreamIngestionTimeMs ingestion time of the last consumed 
message in the first stream (from
+   *                                   {@link RowMetadata})
+   * @param currentOffset offset of the last consumed message (from {@link 
RowMetadata})
+   * @param latestOffset offset of the latest message in the partition (from 
{@link StreamMetadataProvider})
    */
-  public void updateIngestionMetrics(long ingestionTimeMs, long 
firstStreamIngestionTimeMs,
-      StreamPartitionMsgOffset offset, StreamPartitionMsgOffset latestOffset, 
int partitionGroupId) {
-    _ingestionDelayTracker.updateIngestionMetrics(ingestionTimeMs, 
firstStreamIngestionTimeMs, offset, latestOffset,
-        partitionGroupId);
+  public void updateIngestionMetrics(String segmentName, int partitionId, long 
ingestionTimeMs,
+      long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset 
currentOffset,
+      @Nullable StreamPartitionMsgOffset latestOffset) {
+    _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, firstStreamIngestionTimeMs,
+        currentOffset, latestOffset);
   }
 
-  /*
-   * Method used during query execution (ServerQueryExecutorV1Impl) to get the 
current timestamp for the ingestion
-   * delay for a partition
-   *
-   * @param segmentNameStr name of segment for which we want the ingestion 
delay timestamp.
-   * @return timestamp of the ingestion delay for the partition.
+  /**
+   * Returns the ingestion time of the last consumed message for the partition 
of the given segment. Returns
+   * {@code Long.MIN_VALUE} when it is not available.
    */
-  public long getPartitionIngestionTimeMs(String segmentNameStr) {
-    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-    int partitionGroupId = segmentName.getPartitionGroupId();
-    return 
_ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId);
+  public long getPartitionIngestionTimeMs(String segmentName) {
+    return _ingestionDelayTracker.getPartitionIngestionTimeMs(new 
LLCSegmentName(segmentName).getPartitionGroupId());
   }
 
-  /*
+  /**
+   * Removes the ingestion metrics for the partition of the given segment, and 
also ignores the updates from the given
+   * segment. This is useful when we want to stop tracking the ingestion delay 
for a partition when the segment might
+   * still be consuming, e.g. when the new consuming segment is created on a 
different server.
+   */
+  public void removeIngestionMetrics(String segmentName) {
+    _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+  }
+
+  /**
    * Method to handle CONSUMING -> DROPPED segment state transitions:
    * We stop tracking partitions whose segments are dropped.
    *
-   * @param segmentNameStr name of segment which is transitioning state.
+   * @param segmentName name of segment which is transitioning state.
    */
   @Override
-  public void onConsumingToDropped(String segmentNameStr) {
-    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-    
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId());
+  public void onConsumingToDropped(String segmentName) {
+    // NOTE: No need to mark segment ignored here because it should have 
already been dropped.
+    _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new 
LLCSegmentName(segmentName).getPartitionGroupId());
   }
 
-  /*
+  /**
    * Method to handle CONSUMING -> ONLINE segment state transitions:
    * We mark partitions for verification against ideal state when we do not 
see a consuming segment for some time
    * for that partition. The idea is to remove the related metrics when the 
partition moves from the current server.
    *
-   * @param segmentNameStr name of segment which is transitioning state.
+   * @param segmentName name of segment which is transitioning state.
    */
   @Override
-  public void onConsumingToOnline(String segmentNameStr) {
-    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
-    
_ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId());
+  public void onConsumingToOnline(String segmentName) {
+    _ingestionDelayTracker.markPartitionForVerification(segmentName);
   }
 
   @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 8072731583..9cb527b121 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pinot.core.data.manager.realtime;
 
 import java.time.Clock;
@@ -24,8 +23,10 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -33,7 +34,8 @@ import static org.mockito.Mockito.mock;
 
 
 public class IngestionDelayTrackerTest {
-  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
   private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
 
   private final ServerMetrics _serverMetrics = mock(ServerMetrics.class);
@@ -66,8 +68,7 @@ public class IngestionDelayTrackerTest {
     Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
     // Test bad timer args to the constructor
     try {
-      new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
_realtimeTableDataManager,
-          0, () -> true);
+      new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, 
_realtimeTableDataManager, 0, () -> true);
       Assert.fail("Must have asserted due to invalid arguments"); // 
Constructor must assert
     } catch (Exception e) {
       if ((e instanceof NullPointerException) || !(e instanceof 
RuntimeException)) {
@@ -80,7 +81,9 @@ public class IngestionDelayTrackerTest {
   public void testRecordIngestionDelayWithNoAging() {
     final long maxTestDelay = 100;
     final int partition0 = 0;
+    final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 
123).getSegmentName();
     final int partition1 = 1;
+    final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 
234).getSegmentName();
 
     IngestionDelayTracker ingestionDelayTracker = createTracker();
     // Use fixed clock so samples dont age
@@ -90,43 +93,54 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.setClock(clock);
 
     // Test we follow a single partition up and down
-    for (long i = 0; i <= maxTestDelay; i++) {
-      long firstStreamIngestionTimeMs = i + 1;
-      ingestionDelayTracker.updateIngestionDelay(i, 
firstStreamIngestionTimeMs, partition0);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 clock.millis() - i);
+    for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay; 
ingestionTimeMs++) {
+      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+      ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, firstStreamIngestionTimeMs,
+          null, null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
+          clock.millis() - ingestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
           clock.millis() - firstStreamIngestionTimeMs);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 i);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
     }
 
     // Test tracking down a measure for a given partition
-    for (long i = maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 clock.millis() - i);
+    for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0; 
ingestionTimeMs--) {
+      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+      ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, firstStreamIngestionTimeMs,
+          null, null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
+          clock.millis() - ingestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
-          clock.millis() - (i + 1));
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 i);
+          clock.millis() - (ingestionTimeMs + 1));
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
     }
 
     // Make the current partition maximum
-    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay, 
partition0);
+    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
maxTestDelay, maxTestDelay, null, null);
 
     // Bring up partition1 delay up and verify values
-    for (long i = 0; i <= 2 * maxTestDelay; i++) {
-      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 clock.millis() - i);
+    for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay; 
ingestionTimeMs++) {
+      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+      ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, firstStreamIngestionTimeMs,
+          null, null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
+          clock.millis() - ingestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
-          clock.millis() - (i + 1));
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 i);
+          clock.millis() - firstStreamIngestionTimeMs);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
     }
 
     // Bring down values of partition1 and verify values
-    for (long i = 2 * maxTestDelay; i >= 0; i--) {
-      ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 clock.millis() - i);
+    for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0; 
ingestionTimeMs--) {
+      long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
+      ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, firstStreamIngestionTimeMs,
+          null, null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
+          clock.millis() - ingestionTimeMs);
       
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
-          clock.millis() - (i + 1));
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 i);
+          clock.millis() - firstStreamIngestionTimeMs);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
     }
 
     ingestionDelayTracker.shutdown();
@@ -136,11 +150,13 @@ public class IngestionDelayTrackerTest {
   @Test
   public void testRecordIngestionDelayWithAging() {
     final int partition0 = 0;
+    final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 
123).getSegmentName();
     final long partition0Delay0 = 1000;
     final long partition0Delay1 = 10; // record lower delay to make sure max 
gets reduced
     final long partition0Offset0Ms = 300;
     final long partition0Offset1Ms = 1000;
     final int partition1 = 1;
+    final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 
234).getSegmentName();
     final long partition1Delay0 = 11;
     final long partition1Offset0Ms = 150;
 
@@ -151,14 +167,12 @@ public class IngestionDelayTrackerTest {
     ZoneId zoneId = ZoneId.systemDefault();
     Clock clock = Clock.fixed(now, zoneId);
     ingestionDelayTracker.setClock(clock);
-    long ingestionTimeMs = (clock.millis() - partition0Delay0);
-    ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
-        (clock.millis() - partition0Delay0), partition0);
-    ingestionDelayTracker.updateIngestionDelay((clock.millis() - 
partition0Delay0), (clock.millis() - partition0Delay0),
-        partition0);
+    long ingestionTimeMs = clock.millis() - partition0Delay0;
+    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, ingestionTimeMs, null, null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
+
     // Advance clock and test aging
     Clock offsetClock = Clock.offset(clock, 
Duration.ofMillis(partition0Offset0Ms));
     ingestionDelayTracker.setClock(offsetClock);
@@ -168,9 +182,8 @@ public class IngestionDelayTrackerTest {
         (partition0Delay0 + partition0Offset0Ms));
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
 
-    ingestionTimeMs = (offsetClock.millis() - partition0Delay1);
-    ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
-        (offsetClock.millis() - partition0Delay1), partition0);
+    ingestionTimeMs = offsetClock.millis() - partition0Delay1;
+    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
ingestionTimeMs, ingestionTimeMs, null, null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
 partition0Delay1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
 partition0Delay1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
 ingestionTimeMs);
@@ -181,9 +194,8 @@ public class IngestionDelayTrackerTest {
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
         (partition0Delay1 + partition0Offset1Ms));
 
-    ingestionTimeMs = (offsetClock.millis() - partition1Delay0);
-    ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
-        (offsetClock.millis() - partition1Delay0), partition1);
+    ingestionTimeMs = offsetClock.millis() - partition1Delay0;
+    ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
ingestionTimeMs, ingestionTimeMs, null, null);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
 partition1Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
 partition1Delay0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
 ingestionTimeMs);
@@ -210,26 +222,54 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.setClock(clock);
 
     // Record a number of partitions with delay equal to partition id
-    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
-      long ingestionTimeMs = (clock.millis() - partitionGroupId);
-      ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, 
ingestionTimeMs, partitionGroupId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 partitionGroupId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
-          partitionGroupId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId),
 ingestionTimeMs);
+    for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
+      String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 
123).getSegmentName();
+      long ingestionTimeMs = clock.millis() - partitionId;
+      ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, ingestionTimeMs, null,
+          null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 partitionId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 partitionId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
 ingestionTimeMs);
     }
-    for (int partitionGroupId = maxPartition; partitionGroupId >= 0; 
partitionGroupId--) {
-      
ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
+    for (int partitionId = maxPartition; partitionId >= 0; partitionId--) {
+      ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
     }
-    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
+    for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
       // Untracked partitions must return 0
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 0);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
 0);
-      Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(
-          partitionGroupId), Long.MIN_VALUE);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
 Long.MIN_VALUE);
     }
   }
 
+  @Test
+  public void testStopTrackingIngestionDelayWithSegment() {
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+    // Use fixed clock so samples don't age
+    Instant now = Instant.now();
+    ZoneId zoneId = ZoneId.systemDefault();
+    Clock clock = Clock.fixed(now, zoneId);
+    ingestionDelayTracker.setClock(clock);
+
+    String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
123).getSegmentName();
+    long ingestionTimeMs = clock.millis() - 10;
+    ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
10);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 10);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
ingestionTimeMs);
+
+    ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
+
+    // Should not update metrics for removed segment
+    ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, 
ingestionTimeMs, ingestionTimeMs, null, null);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 
0);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
 0);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), 
Long.MIN_VALUE);
+  }
+
   @Test
   public void testShutdown() {
     final long maxTestDelay = 100;
@@ -242,12 +282,13 @@ public class IngestionDelayTrackerTest {
     ingestionDelayTracker.setClock(clock);
 
     // Test Shutdown with partitions active
-    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
-      ingestionDelayTracker.updateIngestionDelay((clock.millis() - 
partitionGroupId),
-          (clock.millis() - partitionGroupId), partitionGroupId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
 partitionGroupId);
-      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
-          partitionGroupId);
+    for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
+      String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 
123).getSegmentName();
+      long ingestionTimeMs = clock.millis() - partitionId;
+      ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, 
ingestionTimeMs, ingestionTimeMs, null,
+          null);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
 partitionId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
 partitionId);
     }
     ingestionDelayTracker.shutdown();
 
@@ -257,65 +298,35 @@ public class IngestionDelayTrackerTest {
   }
 
   @Test
-  public void testRecordIngestionDelayOffsetWithNoAging() {
+  public void testRecordIngestionDelayOffset() {
     final int partition0 = 0;
+    final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 
123).getSegmentName();
     final int partition1 = 1;
+    final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 
234).getSegmentName();
 
     IngestionDelayTracker ingestionDelayTracker = createTracker();
-    // Use fixed clock so samples don't age
-    Instant now = Instant.now();
-    ZoneId zoneId = ZoneId.systemDefault();
-    Clock clock = Clock.fixed(now, zoneId);
-    ingestionDelayTracker.setClock(clock);
 
     // Test tracking offset lag for a single partition
     StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
     StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
-    ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, 
partition0);
+    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
+        latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 100);
 
     // Test tracking offset lag for another partition
     StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
     StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
-    ingestionDelayTracker.updateIngestionOffsets(msgOffset1, latestOffset1, 
partition1);
+    ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
+        latestOffset1);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
 100);
 
     // Update offset lag for partition0
     msgOffset0 = new LongMsgOffset(150);
     latestOffset0 = new LongMsgOffset(200);
-    ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, 
partition0);
+    ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, 
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
+        latestOffset0);
     
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 50);
 
     ingestionDelayTracker.shutdown();
   }
-
-  @Test
-  public void testRecordIngestionDelayOffsetWithAging() {
-    final int partition0 = 0;
-    final long partition0OffsetLag0 = 100;
-    final long partition0OffsetLag1 = 50;
-
-    IngestionDelayTracker ingestionDelayTracker = createTracker();
-
-    // With samples for a single partition, test that sample is aged as 
expected
-    Instant now = Instant.now();
-    ZoneId zoneId = ZoneId.systemDefault();
-    Clock clock = Clock.fixed(now, zoneId);
-    ingestionDelayTracker.setClock(clock);
-
-    StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100);
-    StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200);
-    ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, 
partition0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 partition0OffsetLag0);
-
-    // Update offset lag and test aging
-    msgOffset0 = new LongMsgOffset(150);
-    latestOffset0 = new LongMsgOffset(200);
-    ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, 
partition0);
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 partition0OffsetLag1);
-
-
-    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
 partition0OffsetLag1);
-    ingestionDelayTracker.shutdown();
-  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index 11729316c4..0b7111a75e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -30,6 +30,7 @@ import 
org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.messages.ForceCommitMessage;
+import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableDeletionMessage;
@@ -39,7 +40,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.util.SegmentRefreshSemaphore;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +75,8 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         return new TableDeletionMessageHandler(new 
TableDeletionMessage(message), _metrics, context);
       case ForceCommitMessage.FORCE_COMMIT_MSG_SUB_TYPE:
         return new ForceCommitMessageHandler(new ForceCommitMessage(message), 
_metrics, context);
+      case IngestionMetricsRemoveMessage.INGESTION_METRICS_REMOVE_MSG_SUB_TYPE:
+        return new IngestionMetricsRemoveMessageHandler(new 
IngestionMetricsRemoveMessage(message), _metrics, context);
       default:
         LOGGER.warn("Unsupported user defined message sub type: {} for 
segment: {}", msgSubType,
             message.getPartitionName());
@@ -229,6 +234,27 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
     }
   }
 
+  private class IngestionMetricsRemoveMessageHandler extends 
DefaultMessageHandler {
+
+    IngestionMetricsRemoveMessageHandler(IngestionMetricsRemoveMessage 
message, ServerMetrics metrics,
+        NotificationContext context) {
+      super(message, metrics, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _logger.info("Handling ingestion metrics remove message for table: {}, 
segment: {}", _tableNameWithType,
+          _segmentName);
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(_tableNameWithType);
+      if (tableDataManager instanceof RealtimeTableDataManager) {
+        ((RealtimeTableDataManager) 
tableDataManager).removeIngestionMetrics(_segmentName);
+      }
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      helixTaskResult.setSuccess(true);
+      return helixTaskResult;
+    }
+  }
+
   private static class DefaultMessageHandler extends MessageHandler {
     final String _segmentName;
     final String _tableNameWithType;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to