This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a216c5a4f [feature] Add a Tracker class to support aggregate-worst 
case consumption delay… (#9994)
6a216c5a4f is described below

commit 6a216c5a4fa8cc91dcdbb4c40949d2d2b83e6453
Author: Juan Gomez <jugo...@linkedin.com>
AuthorDate: Mon Jan 9 17:43:28 2023 -0800

    [feature] Add a Tracker class to support aggregate-worst case consumption 
delay… (#9994)
    
    * Add a Tracker class to support aggregate-worst case consumption delay in 
Pinot
    
    Early draft for review, no significant unitest or end to end testing done 
yet.
    Missing: code to read ideal state, unitets.
    -Changed code to not remove but verify a partition when CONSUMING to 
OFFLINE transition
    comes
    Tests: make sure server, core tests pass.
    -Added support for reading ideal state
    -Added some unitests for tracker object
    -Added more unitests that deal with partition removal.
    -Removed caching the maximum to simplefy the code.
    -Use Concurrent hashes instead of synchronized methods.
    -Renamed some variables for consistency
    -Refactored timeout code
    -Fix failing unitests and a number of renames
    -Use int for partitionGroupIds instead of longs
    -Some renames and minor edits.
    -Move some of uses of partition id to be integers for consistency with 
previous code.
    
    Remove use of type cast by adding interfaces to super class.
    
    * Remove initial delay of timer thread from configurable parameters, clean 
throw message and
    update comments to reflect new method names.
    
    * Added support for per partition metrics and some renames.
    Added support for prefix to metric name so we can add multiple trackers and 
metrics per table.
    
    * Add code to support disabling emitting per partition or aggregate metric.
    
    * Do per-batch sampling of delay metric and not per event for efficiency 
reasons.
    Some rename to avoid the Pinot word in name of ingestion delay.
    
    * Add Navina's code to not sample partition delay during catching up phase.
    
    * Rename Millis to Ms and rearrange final private to private final
    
    * Rename Consumption Delay to ingestion delay, remove unnecessary casts, 
factor out zeroing out the ingestion delay.
    
    * Use anonymous class for timer tasks, some final variables and add TODO 
for some issues we noted in the code while working on this.
    
    * Remove checks for _ingestionDelayTracker being null as a number of other 
things break before if that happens. Rollback method to
    set delay to zero after oflline discussion.
    
    * Consolidate segement state transition actions in a single method, some 
minor edits to comments to reflect we now track per partition ingestion delay
    
    * Roll back change where we use one interface for all segement state 
transitions, code is simpler with interface per transition, also helps with 
consistency with other parts of the code.
    
    * Optimization to only check ideal state when we really need to.
    
    * Cleanup some comments with stale references, some minor refactors.
    
    * Consolidate all ingestion delay measures under the same Gauge.
    
    * Eliminte the use of optional aging and fix tests to use fixed clocks and 
get predictable results.
    
    * Remove the per-partition metric mute and use server ready to serve 
queries as signal to start collecting metrics.
    
    * Address most of the last round of comments.
    
    * Remove isServerReady code from Segment data provider to keep code 
simpler, encapsulate in IngestionDelay class instead.
    
    * Use CONSUMING segements to determine hosted partitions instead of ONLINE, 
getSegmentsInGivenStateForThisInstance returns set instead of list.
    
    * Added few unitests for per partition measures, few comments edits
    
    * Removing aggregate metric and some other things requested to simplify the 
change.
---
 .../pinot/common/metrics/AbstractMetrics.java      |  31 +++
 .../apache/pinot/common/metrics/ServerGauge.java   |   4 +-
 .../core/data/manager/InstanceDataManager.java     |   8 +
 .../manager/offline/TableDataManagerProvider.java  |  10 +-
 .../manager/realtime/IngestionDelayTracker.java    | 308 +++++++++++++++++++++
 .../realtime/LLRealtimeSegmentDataManager.java     |  19 +-
 .../manager/realtime/RealtimeTableDataManager.java |  68 ++++-
 .../realtime/IngestionDelayTrackerTest.java        | 204 ++++++++++++++
 .../local/data/manager/TableDataManager.java       |  16 ++
 .../local/utils/tablestate/TableStateUtils.java    |  45 ++-
 .../server/starter/helix/BaseServerStarter.java    |   4 +-
 .../starter/helix/HelixInstanceDataManager.java    |   9 +-
 .../SegmentOnlineOfflineStateModelFactory.java     |   9 +-
 13 files changed, 715 insertions(+), 20 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 9add8f98e8..e4eb329c7d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -537,6 +537,23 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
   }
 
+  /**
+   * Install a per-partition table gauge if needed.
+   *
+   * @param tableName The table name
+   * @param partitionId The partition name
+   * @param gauge The gauge to use
+   * @param valueCallback the callback function to be called while reading the 
metric.
+   */
+  public void addCallbackPartitionGaugeIfNeeded(final String tableName, final 
int partitionId, final G gauge,
+      final Callable<Long> valueCallback) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + 
partitionId;
+
+    addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
+  }
+
   /**
    * Similar to addCallbackGauge method.
    * This method may be called multiple times, while it will be registered to 
callback function only once.
@@ -602,6 +619,20 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     removeGauge(fullGaugeName);
   }
 
+
+  /**
+   * Removes a table gauge given the table name and the gauge.
+   * The add/remove is expected to work correctly in case of being invoked 
across multiple threads.
+   * @param tableName table name
+   * @param gauge the gauge to be removed
+   */
+  public void removePartitionGauge(final String tableName, final int 
partitionId, final G gauge) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + 
partitionId;
+    removeGauge(fullGaugeName);
+  }
+
   /**
    * Remove gauge from Pinot metrics.
    * @param gaugeName gauge name
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 015e75aae9..7dd52e90ea 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -45,7 +45,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // Dedup metrics
   DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
   CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
-  JVM_HEAP_USED_BYTES("bytes", true);
+  JVM_HEAP_USED_BYTES("bytes", true),
+  // Ingestion delay metric
+  REALTIME_INGESTION_DELAY_MS("milliseconds", false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 77cab8135a..0e041021d2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager;
 import java.io.File;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
@@ -183,4 +184,11 @@ public interface InstanceDataManager {
    * Immediately stop consumption and start committing the consuming segments.
    */
   void forceCommit(String tableNameWithType, Set<String> segmentNames);
+
+  /**
+   * Enables the installation of a method to determine if a server is ready to 
server queries.
+   *
+   * @param isServerReadyToServeQueries supplier to retrieve state of server.
+   */
+  void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> 
isServerReadyToServeQueries);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 15f284ae09..2a2671047a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline;
 
 import com.google.common.cache.LoadingCache;
 import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -54,6 +55,13 @@ public class TableDataManagerProvider {
   public static TableDataManager getTableDataManager(TableDataManagerConfig 
tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
       LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
+    return getTableDataManager(tableDataManagerConfig, instanceId, 
propertyStore, serverMetrics, helixManager,
+        errorCache, () -> true);
+  }
+
+  public static TableDataManager getTableDataManager(TableDataManagerConfig 
tableDataManagerConfig, String instanceId,
+      ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
+      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, 
Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
     switch (tableDataManagerConfig.getTableType()) {
       case OFFLINE:
@@ -64,7 +72,7 @@ public class TableDataManagerProvider {
         }
         break;
       case REALTIME:
-        tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore);
+        tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
         break;
       default:
         throw new IllegalStateException();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
new file mode 100644
index 0000000000..3031aee91b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given 
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the 
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects 
hosted in the corresponding
+ *   RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In 
addition, individual metrics are associated
+ *   with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are 
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay 
can be increased when partition stops
+ *   consuming for any reason other than no events being available for 
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being 
tracked so their delays do not cloud
+ *   delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the 
corresponding partition.
+ *   If no consumption is noticed after the timeout, we then read ideal state 
to confirm the server still hosts the
+ *   partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions 
and drive the reading of their ideal
+ *  state.
+ *
+ *  The following diagram illustrates the object interactions with main 
external APIs
+ *
+ *     (CONSUMING -> ONLINE state change)
+ *             |
+ *      markPartitionForConfirmation(partitionId)
+ *            |                         
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ *            |                         |
+ * ___________V_________________________V_
+ * |           (Table X)                
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker              |           ...
+ * 
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
 (Partition n}}
+ *              ^                      ^
+ *              |                       \
+ *   timeoutInactivePartitions()    
stopTrackingPartitionIngestionDelay(partitionId)
+ *    _________|__________                \
+ *   | TimerTrackingTask |          (CONSUMING -> DROPPED state change)
+ *   |___________________|
+ *
+ * TODO: handle bug situations like the one where a partition is not allocated 
to a given server due to a bug.
+ */
+
+public class IngestionDelayTracker {
+
+  // Sleep interval for timer thread that triggers read of ideal state
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 
minutes +/- precision in timeouts
+  // Once a partition is marked for verification, we wait 10 minutes to pull 
its ideal state.
+  private static final int PARTITION_TIMEOUT_MS = 600000;          // 10 
minutes timeouts
+  // Delay Timer thread for this amount of time after starting timer
+  private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+  private static final Logger _logger = 
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
+
+  /*
+   * Class to keep an ingestion delay measure and the time when the sample was 
taken (i.e. sample time)
+   * We will use the sample time to increase ingestion delay when a partition 
stops consuming: the time
+   * difference between the sample time and current time will be added to the 
metric when read.
+   */
+  static private class DelayMeasure {
+    public DelayMeasure(long t, long d) {
+      _delayMs = d;
+      _sampleTime = t;
+    }
+    public final long _delayMs;
+    public final long _sampleTime;
+  }
+
+  // HashMap used to store delay measures for all partitions active for the 
current table.
+  private final ConcurrentHashMap<Integer, DelayMeasure> 
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+  // We mark partitions that go from CONSUMING to ONLINE in 
_partitionsMarkedForVerification: if they do not
+  // go back to CONSUMING in some period of time, we confirm whether they are 
still hosted in this server by reading
+  // ideal state. This is done with the goal of minimizing reading ideal state 
for efficiency reasons.
+  private final ConcurrentHashMap<Integer, Long> 
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+  final int _timerThreadTickIntervalMs;
+  // Timer task to check partitions that are inactive against ideal state.
+  private final Timer _timer;
+
+  private final ServerMetrics _serverMetrics;
+  private final String _tableNameWithType;
+  private final String _metricName;
+
+  private final RealtimeTableDataManager _realTimeTableDataManager;
+  private final Supplier<Boolean> _isServerReadyToServeQueries;
+
+  private Clock _clock;
+
+  public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+      RealtimeTableDataManager realtimeTableDataManager, int 
timerThreadTickIntervalMs,
+      Supplier<Boolean> isServerReadyToServeQueries)
+      throws RuntimeException {
+    _serverMetrics = serverMetrics;
+    _tableNameWithType = tableNameWithType;
+    _metricName = tableNameWithType;
+    _realTimeTableDataManager = realtimeTableDataManager;
+    _clock = Clock.systemUTC();
+    _isServerReadyToServeQueries = isServerReadyToServeQueries;
+    // Handle negative timer values
+    if (timerThreadTickIntervalMs <= 0) {
+      throw new RuntimeException(String.format("Illegal timer timeout 
argument, expected > 0, got=%d for table=%s",
+          timerThreadTickIntervalMs, _tableNameWithType));
+    }
+    _timerThreadTickIntervalMs = timerThreadTickIntervalMs;
+    _timer = new Timer("IngestionDelayTimerThread-" + 
TableNameBuilder.extractRawTableName(tableNameWithType));
+    _timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        timeoutInactivePartitions();
+      }
+    }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+  }
+
+  public IngestionDelayTracker(ServerMetrics serverMetrics, String 
tableNameWithType,
+      RealtimeTableDataManager tableDataManager, Supplier<Boolean> 
isServerReadyToServeQueries) {
+    this(serverMetrics, tableNameWithType, tableDataManager, 
TIMER_THREAD_TICK_INTERVAL_MS,
+        isServerReadyToServeQueries);
+  }
+
+  /*
+   * Helper function to age a delay measure. Aging means adding the time 
elapsed since the measure was
+   * taken till the measure is being reported.
+  *
+  * @param currentDelay original sample delay to which we will add the age of 
the measure.
+   */
+  private long getAgedDelay(DelayMeasure currentDelay) {
+    if (currentDelay == null) {
+      return 0; // return 0 when not initialized
+    }
+    // Add age of measure to the reported value
+    long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+    // Correct to zero for any time shifts due to NTP or time reset.
+    measureAgeInMs = Math.max(measureAgeInMs, 0);
+    return currentDelay._delayMs + measureAgeInMs;
+  }
+
+  /*
+   * Helper function to be called when we should stop tracking a given 
partition. Removes the partition from
+   * all our maps.
+   *
+   * @param partitionGroupId partition ID which we should stop tracking.
+   */
+  private void removePartitionId(int partitionGroupId) {
+    _partitionToDelaySampleMap.remove(partitionGroupId);
+    // If we are removing a partition we should stop reading its ideal state.
+    _partitionsMarkedForVerification.remove(partitionGroupId);
+    _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, 
ServerGauge.REALTIME_INGESTION_DELAY_MS);
+  }
+
+  /*
+   * Helper functions that creates a list of all the partitions that are 
marked for verification and whose
+   * timeouts are expired. This helps us optimize checks of the ideal state.
+   */
+  private ArrayList<Integer> getPartitionsToBeVerified() {
+    ArrayList<Integer> partitionsToVerify = new ArrayList<>();
+    for (ConcurrentHashMap.Entry<Integer, Long> entry : 
_partitionsMarkedForVerification.entrySet()) {
+      long timeMarked = _clock.millis() - entry.getValue();
+      if (timeMarked > PARTITION_TIMEOUT_MS) {
+        // Partition must be verified
+        partitionsToVerify.add(entry.getKey());
+      }
+    }
+    return partitionsToVerify;
+  }
+
+
+  /**
+   * Function that enable use to set predictable clocks for testing purposes.
+   *
+   * @param clock clock to be used by the class
+   */
+  @VisibleForTesting
+  void setClock(Clock clock) {
+    _clock = clock;
+  }
+
+  /*
+   * Called by LLRealTimeSegmentDataManagers to post delay updates to this 
tracker class.
+   *
+   * @param delayMs ingestion delay being recorded.
+   * @param sampleTime sample time.
+   * @param partitionGroupId partition ID for which this delay is being 
recorded.
+   */
+  public void updateIngestionDelay(long delayMs, long sampleTime, int 
partitionGroupId) {
+    // Store new measure and wipe old one for this partition
+    // TODO: see if we can install gauges after the server is ready.
+    if (!_isServerReadyToServeQueries.get()) {
+      // Do not update the ingestion delay metrics during server startup period
+      return;
+    }
+    DelayMeasure previousMeasure = 
_partitionToDelaySampleMap.put(partitionGroupId,
+        new DelayMeasure(sampleTime, delayMs));
+    if (previousMeasure == null) {
+      // First time we start tracking a partition we should start tracking it 
via metric
+      _serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName, 
partitionGroupId,
+          ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> 
getPartitionIngestionDelay(partitionGroupId));
+    }
+    // If we are consuming we do not need to track this partition for removal.
+    _partitionsMarkedForVerification.remove(partitionGroupId);
+  }
+
+  /*
+   * Handle partition removal event. This must be invoked when we stop serving 
a given partition for
+   * this table in the current server.
+   *
+   * @param partitionGroupId partition id that we should stop tracking.
+   */
+  public void stopTrackingPartitionIngestionDelay(int partitionGroupId) {
+    removePartitionId(partitionGroupId);
+  }
+
+  /*
+   * This method is used for timing out inactive partitions, so we don't 
display their metrics on current server.
+   * When the inactive time exceeds some threshold, we read from ideal state 
to confirm we still host the partition,
+   * if not we remove the partition from being tracked locally.
+   * This call is to be invoked by a timer thread that will periodically wake 
up and invoke this function.
+   */
+  public void timeoutInactivePartitions() {
+    Set<Integer> partitionsHostedByThisServer = null;
+    // Check if we have any partition to verify, else don't make the call to 
check ideal state as that
+    // involves network traffic and may be inefficient.
+    ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified();
+    if (partitionsToVerify.size() == 0) {
+      // Don't make the call to getHostedPartitionsGroupIds() as it involves 
checking ideal state.
+      return;
+    }
+    try {
+      partitionsHostedByThisServer = 
_realTimeTableDataManager.getHostedPartitionsGroupIds();
+    } catch (Exception e) {
+      _logger.error("Failed to get partitions hosted by this server, 
table={}", _tableNameWithType);
+      return;
+    }
+    for (int partitionGroupId : partitionsToVerify) {
+      if (!partitionsHostedByThisServer.contains(partitionGroupId)) {
+        // Partition is not hosted in this server anymore, stop tracking it
+        removePartitionId(partitionGroupId);
+      }
+    }
+  }
+
+  /*
+   * This function is invoked when a partition goes from CONSUMING to ONLINE, 
so we can assert whether the
+   * partition is still hosted by this server after some interval of time.
+   *
+   * @param partitionGroupId Partition id that we need confirmed via ideal 
state as still hosted by this server.
+   */
+  public void markPartitionForVerification(int partitionGroupId) {
+    _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
+  }
+
+  /*
+   * Method to get ingestion delay for a given partition.
+   *
+   * @param partitionGroupId partition for which we are retrieving the delay
+   *
+   * @return ingestion delay in milliseconds for the given partition ID.
+   */
+  public long getPartitionIngestionDelay(int partitionGroupId) {
+    DelayMeasure currentMeasure = 
_partitionToDelaySampleMap.get(partitionGroupId);
+    return getAgedDelay(currentMeasure);
+  }
+
+  /*
+   * We use this method to clean up when a table is being removed. No updates 
are expected at this time
+   * as all LLRealtimeSegmentManagers should be down now.
+   */
+  public void shutdown() {
+    // Now that segments can't report metric, destroy metric for this table
+    _timer.cancel();
+    // Remove partitions so their related metrics get uninstalled.
+    for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry : 
_partitionToDelaySampleMap.entrySet()) {
+      removePartitionId(entry.getKey());
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 2f38640568..fe7c0f30b1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -600,12 +600,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
           }
         }
       }
-
       _currentOffset = 
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
       _numRowsConsumed++;
       streamMessageCount++;
     }
+    updateIngestionDelay(indexedMessageCount);
     updateCurrentDocumentCountMetrics();
     if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
       _hasMessagesFetched = true;
@@ -617,6 +617,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", 
idlePipeSleepTimeMillis);
       }
+      // Record Pinot ingestion delay as zero since we are up-to-date and no 
new events
+      _realtimeTableDataManager.updateIngestionDelay(0, 
System.currentTimeMillis(), _partitionGroupId);
       // If there were no messages to be fetched from stream, wait for a 
little bit as to avoid hammering the stream
       Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, 
TimeUnit.MILLISECONDS);
     }
@@ -1560,6 +1562,21 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(_clientId, 
_partitionGroupId);
   }
 
+  /*
+   * Updates the ingestion delay if messages were processed using the time 
stamp for the last consumed event.
+   *
+   * @param indexedMessagesCount
+   */
+  private void updateIngestionDelay(int indexedMessageCount) {
+    if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
+      long ingestionDelayMs = _lastConsumedTimestampMs - 
_lastRowMetadata.getRecordIngestionTimeMs();
+      ingestionDelayMs = Math.max(ingestionDelayMs, 0);
+      // Record Ingestion delay for this partition
+      _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs, 
_lastConsumedTimestampMs,
+          _partitionGroupId);
+    }
+  }
+
   // This should be done during commit? We may not always commit when we build 
a segment....
   // TODO Call this method when we are loading the segment, which we do from 
table datamanager afaik
   private void updateCurrentDocumentCountMetrics() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 89b45f1cce..fb53de64ea 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -22,8 +22,10 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -115,15 +118,25 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private TableUpsertMetadataManager _tableUpsertMetadataManager;
+  // Object to track ingestion delay for all partitions
+  private IngestionDelayTracker _ingestionDelayTracker;
+  private final Supplier<Boolean> _isServerReadyToServeQueries;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+    this(segmentBuildSemaphore, () -> true);
+  }
+
+  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
Supplier<Boolean> isServerReadyToServeQueries) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
+    _isServerReadyToServeQueries = isServerReadyToServeQueries;
   }
 
   @Override
   protected void doInit() {
     _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, 
_serverMetrics, _tableNameWithType);
-
+    // Tracks ingestion delay of all partitions being served for this table
+    _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, 
_tableNameWithType, this,
+        _isServerReadyToServeQueries);
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
       _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -212,6 +225,59 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     if (_leaseExtender != null) {
       _leaseExtender.shutDown();
     }
+    // Make sure we do metric cleanup when we shut down the table.
+    _ingestionDelayTracker.shutdown();
+  }
+
+  /*
+   * Method used by LLRealtimeSegmentManagers to update their partition delays
+   *
+   * @param ingestionDelayMs Ingestion delay being reported.
+   * @param currentTimeMs Timestamp of the measure being provided, i.e. when 
this delay was computed.
+   * @param partitionGroupId Partition ID for which delay is being updated.
+   */
+  public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs, 
int partitionGroupId) {
+    _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs, 
currenTimeMs, partitionGroupId);
+  }
+
+  /*
+   * Method to handle CONSUMING -> DROPPED segment state transitions:
+   * We stop tracking partitions whose segments are dropped.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToDropped(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId());
+  }
+
+  /*
+   * Method to handle CONSUMING -> ONLINE segment state transitions:
+   * We mark partitions for verification against ideal state when we do not 
see a consuming segment for some time
+   * for that partition. The idea is to remove the related metrics when the 
partition moves from the current server.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToOnline(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    
_ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId());
+  }
+
+  /**
+   * Returns all partitionGroupIds for the partitions hosted by this server 
for current table.
+   * @Note: this involves Zookeeper read and should not be used frequently due 
to efficiency concerns.
+   */
+  public Set<Integer> getHostedPartitionsGroupIds() {
+    Set<Integer> partitionsHostedByThisServer = new HashSet<>();
+    List<String> segments = 
TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, 
_tableNameWithType,
+        CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+    for (String segmentNameStr : segments) {
+      LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+      partitionsHostedByThisServer.add(segmentName.getPartitionGroupId());
+    }
+    return partitionsHostedByThisServer;
   }
 
   public RealtimeSegmentStatsHistory getStatsHistory() {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
new file mode 100644
index 0000000000..b91d714e11
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IngestionDelayTrackerTest {
+
+  private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
+
+  private IngestionDelayTracker createTracker() {
+    ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    RealtimeTableDataManager realtimeTableDataManager = new 
RealtimeTableDataManager(null);
+    IngestionDelayTracker ingestionDelayTracker =
+        new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+            realtimeTableDataManager, () -> true);
+    // With no samples, the time reported must be zero
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 
0);
+    return ingestionDelayTracker;
+  }
+
+  @Test
+  public void testTrackerConstructors() {
+    ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    RealtimeTableDataManager realtimeTableDataManager = new 
RealtimeTableDataManager(null);
+    // Test regular constructor
+    IngestionDelayTracker ingestionDelayTracker =
+        new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+            realtimeTableDataManager, () -> true);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 
0);
+    ingestionDelayTracker.shutdown();
+    // Test constructor with timer arguments
+    ingestionDelayTracker =
+        new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+            realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () -> 
true);
+    Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 
0);
+    // Test bad timer args to the constructor
+    try {
+      ingestionDelayTracker =
+          new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+              realtimeTableDataManager, 0, () -> true);
+      Assert.assertTrue(false); // Constructor must assert
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof RuntimeException);
+    }
+  }
+
+  @Test
+  public void testRecordIngestionDelayWithNoAging() {
+    final long maxTestDelay = 100;
+    final int partition0 = 0;
+    final int partition1 = 1;
+
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+    // Use fixed clock so samples dont age
+    Instant now = Instant.now();
+    ZoneId zoneId = ZoneId.systemDefault();
+    Clock clock = Clock.fixed(now, zoneId);
+    ingestionDelayTracker.setClock(clock);
+
+    // Test we follow a single partition up and down
+    for (long i = 0; i <= maxTestDelay; i++) {
+      ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
 i + clock.millis());
+    }
+
+    // Test tracking down a measure for a given partition
+    for (long i = maxTestDelay; i >= 0; i--) {
+      ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
 i + clock.millis());
+    }
+
+    // Make the current partition maximum
+    ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0);
+
+    // Bring up partition1 delay up and verify values
+    for (long i = 0; i <= 2 * maxTestDelay; i++) {
+      ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
 i + clock.millis());
+    }
+
+    // Bring down values of partition1 and verify values
+    for (long i = 2 * maxTestDelay; i >= 0; i--) {
+      ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
 i + clock.millis());
+    }
+
+    ingestionDelayTracker.shutdown();
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testRecordIngestionDelayWithAging() {
+    final int partition0 = 0;
+    final long partition0Delay0 = 1000;
+    final long partition0Delay1 = 10; // record lower delay to make sure max 
gets reduced
+    final long partition0Offset0Ms = 300;
+    final long partition0Offset1Ms = 1000;
+    final int partition1 = 1;
+    final long partition1Delay0 = 11; // Record something slightly higher than 
previous max
+    final long partition1Delay1 = 8;  // Record something lower so that 
partition0 is the current max again
+    final long partition1Offset0Ms = 150;
+    final long partition1Offset1Ms = 450;
+
+    final long sleepMs = 500;
+
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+
+    // With samples for a single partition, test that sample is aged as 
expected
+    Instant now = Instant.now();
+    ZoneId zoneId = ZoneId.systemDefault();
+    Clock clock = Clock.fixed(now, zoneId);
+    ingestionDelayTracker.setClock(clock);
+    ingestionDelayTracker.updateIngestionDelay(partition0Delay0, 
clock.millis(), partition0);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
 partition0Delay0);
+    // Advance clock and test aging
+    Clock offsetClock = Clock.offset(clock, 
Duration.ofMillis(partition0Offset0Ms));
+    ingestionDelayTracker.setClock(offsetClock);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+        (partition0Delay0 + partition0Offset0Ms));
+
+    // Add a new value below max and verify we are tracking the new max 
correctly
+    ingestionDelayTracker.updateIngestionDelay(partition0Delay1, 
offsetClock.millis(), partition0);
+    Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); // 
Save this as we need to verify aging later
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
 partition0Delay1);
+    // Add some offset to the last sample and make sure we age that measure 
properly
+    offsetClock = Clock.offset(offsetClock, 
Duration.ofMillis(partition0Offset1Ms));
+    ingestionDelayTracker.setClock(offsetClock);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+        (partition0Delay1 + partition0Offset1Ms));
+
+    // Now try setting a new maximum in another partition
+    ingestionDelayTracker.updateIngestionDelay(partition1Delay0, 
offsetClock.millis(), partition1);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
 partition1Delay0);
+    // Add some offset to the last sample and make sure we age that measure 
properly
+    offsetClock = Clock.offset(offsetClock, 
Duration.ofMillis(partition1Offset0Ms));
+    ingestionDelayTracker.setClock(offsetClock);
+    
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
+        (partition1Delay0 + partition1Offset0Ms));
+
+    ingestionDelayTracker.shutdown();
+  }
+
+  @Test
+  public void testStopTrackingIngestionDelay() {
+    final long maxTestDelay = 100;
+    final int maxPartition = 100;
+
+    IngestionDelayTracker ingestionDelayTracker = createTracker();
+    // Use fixed clock so samples don't age
+    Instant now = Instant.now();
+    ZoneId zoneId = ZoneId.systemDefault();
+    Clock clock = Clock.fixed(now, zoneId);
+    ingestionDelayTracker.setClock(clock);
+
+    // Record a number of partitions with delay equal to partition id
+    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
+      ingestionDelayTracker.updateIngestionDelay(partitionGroupId, 
clock.millis(), partitionGroupId);
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
 partitionGroupId);
+    }
+    // Verify that as we remove partitions the next available maximum takes 
over
+    for (int partitionGroupId = maxPartition; partitionGroupId >= 0; 
partitionGroupId--) {
+      ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int) 
partitionGroupId);
+    }
+    for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; 
partitionGroupId++) {
+      // Untracked partitions must return 0
+      
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
 0);
+    }
+  }
+
+  @Test
+  public void testTickInactivePartitions() {
+    Assert.assertTrue(true);
+  }
+
+  @Test
+  public void testMarkPartitionForConfirmation() {
+    Assert.assertTrue(true);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index a316669277..121e47f6f6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -200,4 +200,20 @@ public interface TableDataManager {
    * @return List of {@link SegmentErrorInfo}
    */
   Map<String, SegmentErrorInfo> getSegmentErrors();
+
+  /**
+   * Interface to handle segment state transitions from CONSUMING to DROPPED
+   *
+   * @param segmentNameStr name of segment for which the state change is being 
handled
+   */
+  default void onConsumingToDropped(String segmentNameStr) {
+  };
+
+  /**
+   * Interface to handle segment state transitions from CONSUMING to ONLINE
+   *
+   * @param segmentNameStr name of segment for which the state change is being 
handled
+   */
+  default void onConsumingToOnline(String segmentNameStr) {
+  };
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 4e74682220..67be761616 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -39,37 +39,56 @@ public class TableStateUtils {
   }
 
   /**
-   * Checks if all segments for the given @param tableNameWithType are 
succesfully loaded
-   * This function will get all segments in IDEALSTATE and CURRENTSTATE for 
the given table,
-   * and then check if all ONLINE segments in IDEALSTATE match with 
CURRENTSTATE.
-   * @param helixManager helix manager for the server instance
-   * @param tableNameWithType table name for which segment state is to be 
checked
-   * @return true if all segments for the given table are succesfully loaded. 
False otherwise
+   * Returns all segments in a given state for a given table.
+   *
+   * @param helixManager instance of Helix manager
+   * @param tableNameWithType table for which we are obtaining ONLINE segments
+   * @param state state of the segments to be returned
+   *
+   * @return List of segment names in a given state.
    */
-  public static boolean isAllSegmentsLoaded(HelixManager helixManager, String 
tableNameWithType) {
+  public static List<String> 
getSegmentsInGivenStateForThisInstance(HelixManager helixManager, String 
tableNameWithType,
+      String state) {
     HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
     IdealState idealState = 
dataAccessor.getProperty(keyBuilder.idealStates(tableNameWithType));
+    List<String> segmentsInGivenState = new ArrayList<>();
     if (idealState == null) {
       LOGGER.warn("Failed to find ideal state for table: {}", 
tableNameWithType);
-      return false;
+      return segmentsInGivenState;
     }
 
-    // Get all ONLINE segments from idealState
+    // Get all segments with state from idealState
     String instanceName = helixManager.getInstanceName();
-    List<String> onlineSegments = new ArrayList<>();
     Map<String, Map<String, String>> idealStatesMap = 
idealState.getRecord().getMapFields();
     for (Map.Entry<String, Map<String, String>> entry : 
idealStatesMap.entrySet()) {
       String segmentName = entry.getKey();
       Map<String, String> instanceStateMap = entry.getValue();
       String expectedState = instanceStateMap.get(instanceName);
-      // Only track ONLINE segments assigned to the current instance
-      if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(expectedState))
 {
+      // Only track state segments assigned to the current instance
+      if (!state.equals(expectedState)) {
         continue;
       }
-      onlineSegments.add(segmentName);
+      segmentsInGivenState.add(segmentName);
     }
+    return segmentsInGivenState;
+  }
+
+  /**
+   * Checks if all segments for the given @param tableNameWithType were 
succesfully loaded
+   * This function will get all segments in IDEALSTATE and CURRENTSTATE for 
the given table,
+   * and then check if all ONLINE segments in IDEALSTATE match with 
CURRENTSTATE.
+   * @param helixManager helix manager for the server instance
+   * @param tableNameWithType table name for which segment state is to be 
checked
+   * @return true if all segments for the given table are succesfully loaded. 
False otherwise
+   */
+  public static boolean isAllSegmentsLoaded(HelixManager helixManager, String 
tableNameWithType) {
+    HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+    String instanceName = helixManager.getInstanceName();
 
+    List<String> onlineSegments = 
getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType,
+        CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
     if (onlineSegments.size() > 0) {
       LiveInstance liveInstance = 
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
       if (liveInstance == null) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 68773ab89c..fda6caf5c1 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -136,6 +136,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
   protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;
   protected PinotEnvironmentProvider _pinotEnvironmentProvider;
+  protected volatile boolean _isServerReadyToServeQueries = false;
 
   @Override
   public void init(PinotConfiguration serverConf)
@@ -533,6 +534,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverInstance = new ServerInstance(serverConf, _helixManager, 
accessControlFactory);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
+    instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, 
instanceDataManager);
@@ -582,7 +584,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     _serverInstance.startQueryServer();
     _helixAdmin.setConfig(_instanceConfigScope,
         Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, 
Boolean.toString(false)));
-
+    _isServerReadyToServeQueries = true;
     // Throttling for realtime consumption is disabled up to this point to 
allow maximum consumption during startup time
     RealtimeConsumptionRateManager.getInstance().enableThrottling();
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index a254009742..25abdfc9d7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
@@ -94,11 +95,17 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private ServerMetrics _serverMetrics;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private SegmentUploader _segmentUploader;
+  private Supplier<Boolean> _isReadyToServeQueries = () -> true;
 
   // Fixed size LRU cache for storing last N errors on the instance.
   // Key is TableNameWithType-SegmentName pair.
   private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
 
+  @Override
+  public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> 
isServingQueries) {
+    _isReadyToServeQueries = isServingQueries;
+  }
+
   @Override
   public synchronized void init(PinotConfiguration config, HelixManager 
helixManager, ServerMetrics serverMetrics)
       throws ConfigurationException {
@@ -225,7 +232,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     TableDataManagerConfig tableDataManagerConfig = new 
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
     TableDataManager tableDataManager =
         TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, 
_instanceId, _propertyStore,
-            _serverMetrics, _helixManager, _errorCache);
+            _serverMetrics, _helixManager, _errorCache, 
_isReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 8c5fa50b51..b85d13b953 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -91,15 +91,17 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
 
       TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
       Preconditions.checkNotNull(tableDataManager);
+      tableDataManager.onConsumingToOnline(segmentNameStr);
       SegmentDataManager acquiredSegment = 
tableDataManager.acquireSegment(segmentNameStr);
       // For this transition to be correct in helix, we should already have a 
segment that is consuming
       if (acquiredSegment == null) {
         throw new RuntimeException("Segment " + segmentNameStr + " + not 
present ");
       }
 
+      // TODO: https://github.com/apache/pinot/issues/10049
       try {
         if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) {
-          // We found a LLC segment that is not consuming right now, must be 
that we already swapped it with a
+          // We found an LLC segment that is not consuming right now, must be 
that we already swapped it with a
           // segment that has been built. Nothing to do for this state 
transition.
           _logger
               .info("Segment {} not an instance of 
LLRealtimeSegmentDataManager. Reporting success for the transition",
@@ -138,6 +140,11 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
     @Transition(from = "CONSUMING", to = "DROPPED")
     public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context) {
       
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " 
+ message);
+      String realtimeTableName = message.getResourceName();
+      String segmentNameStr = message.getPartitionName();
+      TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(realtimeTableName);
+      Preconditions.checkNotNull(tableDataManager);
+      tableDataManager.onConsumingToDropped(segmentNameStr);
       try {
         onBecomeOfflineFromConsuming(message, context);
         onBecomeDroppedFromOffline(message, context);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to