This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d488bd1e088 Make the ResourceUtilizationChecker more easily extensible 
and add a PK count endpoint to the server side (#16146)
d488bd1e088 is described below

commit d488bd1e0881ca93f4ed50d7be63e8424590050f
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Thu Jun 26 21:32:27 2025 -0700

    Make the ResourceUtilizationChecker more easily extensible and add a PK 
count endpoint to the server side (#16146)
    
    * Expose server-side API to fetch the primary key count across all tables 
(upsert and dedup enabled)
    
    * Controller side changes to integrate primary key count check with 
resource utilization checker
    
    * Add support to skip the PK check from the Minion side
    
    * Rename class and methods
    
    * Fix log
    
    * Create a common interface for all resource utilization checkers and have 
disk and PK count implement that
    
    * Fix test
    
    * Minor rename
    
    * Remove the additional checker and related tests
    
    * Minor fixes
    
    * Address review comments
    
    * rename in test
    
    * Address review comments
    
    * Add an enum CheckPurpose to capture caller context, address other review 
comments
    
    * Add some comments around CheckPurpose
---
 .../restlet/resources/PrimaryKeyCountInfo.java     |  73 ++++++
 .../pinot/controller/BaseControllerStarter.java    |  11 +-
 .../helix/core/minion/PinotTaskManager.java        |   7 +-
 .../validation/DiskUtilizationChecker.java         |  13 +-
 .../RealtimeSegmentValidationManager.java          |   3 +-
 .../validation/ResourceUtilizationChecker.java     |  12 +-
 .../validation/ResourceUtilizationManager.java     |  24 +-
 .../controller/validation/UtilizationChecker.java  |  58 +++++
 .../validation/DiskUtilizationCheckerTest.java     |  57 +++--
 .../RealtimeSegmentValidationManagerTest.java      |   4 +-
 .../validation/ResourceUtilizationManagerTest.java |  70 +++++-
 .../manager/realtime/RealtimeTableDataManager.java |   5 +
 .../dedup/BasePartitionDedupMetadataManager.java   |   2 -
 .../local/dedup/BaseTableDedupMetadataManager.java |  10 +
 ...ConcurrentMapPartitionDedupMetadataManager.java |   2 +-
 .../local/dedup/PartitionDedupMetadataManager.java |   2 +
 .../local/dedup/TableDedupMetadataManager.java     |   8 +
 .../BasePartitionDedupMetadataManagerTest.java     |   2 +-
 .../server/api/resources/InstanceResource.java     |  22 ++
 .../server/api/resources/PrimaryKeyCount.java      |  90 +++++++
 .../pinot/server/api/PrimaryKeyCountTest.java      | 275 +++++++++++++++++++++
 21 files changed, 698 insertions(+), 52 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java
new file mode 100644
index 00000000000..57dffa05326
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/PrimaryKeyCountInfo.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PrimaryKeyCountInfo {
+  private final String _instanceId;
+  private final long _numPrimaryKeys;
+  private final Set<String> _upsertAndDedupTables;
+  private final long _lastUpdatedTimeInEpochMs;
+
+  public PrimaryKeyCountInfo(@JsonProperty("instanceId") String instanceId) {
+    _instanceId = instanceId;
+    _numPrimaryKeys = -1;
+    _upsertAndDedupTables = Set.of();
+    _lastUpdatedTimeInEpochMs = -1;
+  }
+
+  @JsonCreator
+  public PrimaryKeyCountInfo(@JsonProperty("instanceId") String instanceId,
+      @JsonProperty("numPrimaryKeys") long numPrimaryKeys,
+      @JsonProperty("upsertAndDedupTables") Set<String> upsertAndDedupTables,
+      @JsonProperty("lastUpdatedTimeInEpochMs") long lastUpdatedTimeInEpochMs) 
{
+    _instanceId = instanceId;
+    _numPrimaryKeys = numPrimaryKeys;
+    _upsertAndDedupTables = upsertAndDedupTables;
+    _lastUpdatedTimeInEpochMs = lastUpdatedTimeInEpochMs;
+  }
+
+  public String getInstanceId() {
+    return _instanceId;
+  }
+
+  public long getNumPrimaryKeys() {
+    return _numPrimaryKeys;
+  }
+
+  public Set<String> getUpsertAndDedupTables() {
+    return _upsertAndDedupTables;
+  }
+
+  public long getLastUpdatedTimeInEpochMs() {
+    return _lastUpdatedTimeInEpochMs;
+  }
+
+  public String toString() {
+    return "PrimaryKeyCountInfo{" + "_instanceId='" + _instanceId + ", 
_numPrimaryKeys=" + _numPrimaryKeys
+        + ", _upsertAndDedupTables=" + _upsertAndDedupTables + ", 
_lastUpdatedTimeInEpochMs="
+        + _lastUpdatedTimeInEpochMs + '}';
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 54854f0e5b0..8cca18613be 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -122,6 +122,7 @@ import 
org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.controller.validation.ResourceUtilizationChecker;
 import org.apache.pinot.controller.validation.ResourceUtilizationManager;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
+import org.apache.pinot.controller.validation.UtilizationChecker;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
@@ -212,6 +213,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected ExecutorService _rebalancerExecutorService;
   protected TableSizeReader _tableSizeReader;
   protected StorageQuotaChecker _storageQuotaChecker;
+  protected final List<UtilizationChecker> _utilizationCheckers = new 
ArrayList<>();
   protected DiskUtilizationChecker _diskUtilizationChecker;
   protected ResourceUtilizationManager _resourceUtilizationManager;
   protected RebalancePreChecker _rebalancePreChecker;
@@ -343,6 +345,10 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
             MAX_STATE_TRANSITIONS_PER_INSTANCE, constraintItem);
   }
 
+  protected void addUtilizationChecker(UtilizationChecker utilizationChecker) {
+    _utilizationCheckers.add(utilizationChecker);
+  }
+
   /**
    * Creates an instance of PinotHelixResourceManager.
    * <p>
@@ -554,7 +560,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         _helixResourceManager, _config);
 
     _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _config);
-    _resourceUtilizationManager = new ResourceUtilizationManager(_config, 
_diskUtilizationChecker);
+    addUtilizationChecker(_diskUtilizationChecker);
+    _resourceUtilizationManager = new ResourceUtilizationManager(_config, 
_utilizationCheckers);
     _rebalancePreChecker = 
RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass());
     _rebalancePreChecker.init(_helixResourceManager, _executorService, 
_config.getRebalanceDiskUtilizationThreshold());
     _rebalancerExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
@@ -903,7 +910,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         _controllerMetrics, _executorService, _connectionManager);
     periodicTasks.add(responseStoreCleaner);
     PeriodicTask resourceUtilizationChecker = new 
ResourceUtilizationChecker(_config, _connectionManager,
-        _controllerMetrics, _diskUtilizationChecker, _executorService, 
_helixResourceManager);
+        _controllerMetrics, _utilizationCheckers, _executorService, 
_helixResourceManager);
     periodicTasks.add(resourceUtilizationChecker);
 
     return periodicTasks;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index ef86dbab2d6..96f9154241f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -57,6 +57,7 @@ import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerato
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.validation.ResourceUtilizationManager;
+import org.apache.pinot.controller.validation.UtilizationChecker;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -222,7 +223,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
       LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, 
tableNameWithType);
       try {
-        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType))
 {
+        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType,
+            UtilizationChecker.CheckPurpose.TASK_GENERATION)) {
           LOGGER.warn("Resource utilization is above threshold, skipping task 
creation for table: {}", tableName);
           _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
           continue;
@@ -721,7 +723,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
-        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)) {
+        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName,
+            UtilizationChecker.CheckPurpose.TASK_GENERATION)) {
           String message = String.format("Skipping tasks generation as 
resource utilization is not within limits for "
               + "table: %s. Disk utilization for one or more servers hosting 
this table has exceeded the threshold. "
               + "Tasks won't be generated until the issue is mitigated.", 
tableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
index 8edd8b6cd11..fa2f1bfbea2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class DiskUtilizationChecker {
+public class DiskUtilizationChecker implements UtilizationChecker {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DiskUtilizationChecker.class);
   private final int _resourceUtilizationCheckTimeoutMs;
   private final long _resourceUtilizationCheckerFrequencyMs;
@@ -54,10 +54,16 @@ public class DiskUtilizationChecker {
     _resourceUtilizationCheckerFrequencyMs = 
controllerConf.getResourceUtilizationCheckerFrequency() * 1000;
   }
 
+  @Override
+  public String getName() {
+    return DiskUtilizationChecker.class.getSimpleName();
+  }
+
   /**
    * Check if disk utilization for the requested table is within the 
configured limits.
    */
-  public boolean isDiskUtilizationWithinLimits(String tableNameWithType) {
+  @Override
+  public boolean isResourceUtilizationWithinLimits(String tableNameWithType, 
UtilizationChecker.CheckPurpose purpose) {
     if (StringUtils.isEmpty(tableNameWithType)) {
       throw new IllegalArgumentException("Table name found to be null or empty 
while computing disk utilization.");
     }
@@ -102,7 +108,8 @@ public class DiskUtilizationChecker {
   /**
    * Compute disk utilization for the requested instances using the 
<code>CompletionServiceHelper</code>.
    */
-  public void computeDiskUtilization(BiMap<String, String> 
endpointsToInstances,
+  @Override
+  public void computeResourceUtilization(BiMap<String, String> 
endpointsToInstances,
       CompletionServiceHelper completionServiceHelper) {
     List<String> diskUtilizationUris = new 
ArrayList<>(endpointsToInstances.size());
     for (String endpoint : endpointsToInstances.keySet()) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 21d3d428024..39cf6f1b75f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -157,7 +157,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     }
     // Perform resource utilization checks.
     boolean isResourceUtilizationWithinLimits =
-        
_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType);
+        
_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType,
+            UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
     if (!isResourceUtilizationWithinLimits) {
       LOGGER.warn("Resource utilization limit exceeded for table: {}", 
tableNameWithType);
       _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
index 60990f5151d..338d194c85e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.validation;
 
 import com.google.common.collect.BiMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -45,18 +46,18 @@ public class ResourceUtilizationChecker extends 
BasePeriodicTask {
 
   private final PoolingHttpClientConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
-  private final DiskUtilizationChecker _diskUtilizationChecker;
+  private final List<UtilizationChecker> _utilizationCheckers;
   private final Executor _executor;
   private final PinotHelixResourceManager _helixResourceManager;
 
   public ResourceUtilizationChecker(ControllerConf config, 
PoolingHttpClientConnectionManager connectionManager,
-      ControllerMetrics controllerMetrics, DiskUtilizationChecker 
diskUtilizationChecker, Executor executor,
+      ControllerMetrics controllerMetrics, List<UtilizationChecker> 
utilizationCheckers, Executor executor,
       PinotHelixResourceManager pinotHelixResourceManager) {
     super(TASK_NAME, config.getResourceUtilizationCheckerFrequency(),
         config.getResourceUtilizationCheckerInitialDelay());
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
-    _diskUtilizationChecker = diskUtilizationChecker;
+    _utilizationCheckers = utilizationCheckers;
     _executor = executor;
     _helixResourceManager = pinotHelixResourceManager;
   }
@@ -77,7 +78,10 @@ public class ResourceUtilizationChecker extends 
BasePeriodicTask {
       BiMap<String, String> endpointsToInstances = 
instanceAdminEndpoints.inverse();
       CompletionServiceHelper completionServiceHelper =
           new CompletionServiceHelper(_executor, _connectionManager, 
endpointsToInstances);
-      _diskUtilizationChecker.computeDiskUtilization(endpointsToInstances, 
completionServiceHelper);
+      for (UtilizationChecker utilizationChecker : _utilizationCheckers) {
+        LOGGER.debug("Computing resource utilization for checker: {}", 
utilizationChecker.getName());
+        utilizationChecker.computeResourceUtilization(endpointsToInstances, 
completionServiceHelper);
+      }
     } catch (Exception e) {
       LOGGER.error("Caught exception while running task: {}", _taskName, e);
       _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
index c53507913ba..396d6ba68c4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.validation;
 
+import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.slf4j.Logger;
@@ -28,15 +29,16 @@ public class ResourceUtilizationManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceUtilizationManager.class);
 
   private final boolean _isResourceUtilizationCheckEnabled;
-  private final DiskUtilizationChecker _diskUtilizationChecker;
+  private final List<UtilizationChecker> _utilizationCheckers;
 
-  public ResourceUtilizationManager(ControllerConf controllerConf, 
DiskUtilizationChecker diskUtilizationChecker) {
+  public ResourceUtilizationManager(ControllerConf controllerConf, 
List<UtilizationChecker> utilizationCheckers) {
     _isResourceUtilizationCheckEnabled = 
controllerConf.isResourceUtilizationCheckEnabled();
-    LOGGER.info("Resource utilization check is: {}", 
_isResourceUtilizationCheckEnabled ? "enabled" : "disabled");
-    _diskUtilizationChecker = diskUtilizationChecker;
+    LOGGER.info("Resource utilization check is: {}, with {} resource 
utilization checkers",
+        _isResourceUtilizationCheckEnabled ? "enabled" : "disabled", 
utilizationCheckers.size());
+    _utilizationCheckers = utilizationCheckers;
   }
 
-  public boolean isResourceUtilizationWithinLimits(String tableNameWithType) {
+  public boolean isResourceUtilizationWithinLimits(String tableNameWithType, 
UtilizationChecker.CheckPurpose purpose) {
     if (!_isResourceUtilizationCheckEnabled) {
       return true;
     }
@@ -44,6 +46,16 @@ public class ResourceUtilizationManager {
       throw new IllegalArgumentException("Table name found to be null or empty 
while checking resource utilization.");
     }
     LOGGER.info("Checking resource utilization for table: {}", 
tableNameWithType);
-    return 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableNameWithType);
+    boolean overallIsResourceUtilizationWithinLimits = true;
+    for (UtilizationChecker utilizationChecker : _utilizationCheckers) {
+      boolean isResourceUtilizationWithinLimits =
+          
utilizationChecker.isResourceUtilizationWithinLimits(tableNameWithType, 
purpose);
+      LOGGER.info("For utilization checker: {}, 
isResourceUtilizationWithinLimits: {}, purpose: {}",
+          utilizationChecker.getName(), isResourceUtilizationWithinLimits, 
purpose);
+      if (!isResourceUtilizationWithinLimits) {
+        overallIsResourceUtilizationWithinLimits = false;
+      }
+    }
+    return overallIsResourceUtilizationWithinLimits;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
new file mode 100644
index 00000000000..2a4cdc9ce19
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.collect.BiMap;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+
+
+/**
+ * Interface for all utilization checkers to be used to by 
ResourceUtilizationManager and ResourceUtilizationChecker
+ */
+public interface UtilizationChecker {
+  /**
+   * Get the name of the utilization checker
+   */
+  String getName();
+
+  /**
+   * Returns true if the resource's utilization is within limits
+   * @param tableNameWithType table name with type
+   * @param purpose purpose of this check
+   */
+  boolean isResourceUtilizationWithinLimits(String tableNameWithType, 
CheckPurpose purpose);
+
+  /**
+   * Computes the resource's utilization
+   * @param endpointsToInstances map of endpoints to instances
+   * @param completionServiceHelper the completion service helper
+   */
+  void computeResourceUtilization(BiMap<String, String> endpointsToInstances,
+      CompletionServiceHelper completionServiceHelper);
+
+  /**
+   * Passed to 'isResourceUtilizationWithinLimits' so that each 
'UtilizationChecker' can decide if any special handling
+   * is required depending on the origin of the check
+   */
+  enum CheckPurpose {
+    // REALTIME_INGESTION if the check is performed from the realtime 
ingestion code path to pause ingestion
+    // TASK_GENERATION if the check is performed from the task generation 
framework to pause creation of new tasks
+    REALTIME_INGESTION, TASK_GENERATION
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
index ea0b5aa2ba0..3e29c0c57f7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
@@ -44,28 +44,35 @@ public class DiskUtilizationCheckerTest {
 
   private static final String DISK_UTILIZATION_PATH = "/disk/utilization/path";
   private PinotHelixResourceManager _helixResourceManager;
-  private ControllerConf _controllerConf;
   private DiskUtilizationChecker _diskUtilizationChecker;
 
   @BeforeMethod
   public void setUp() {
     _helixResourceManager = mock(PinotHelixResourceManager.class);
-    _controllerConf = mock(ControllerConf.class);
+    ControllerConf controllerConf = mock(ControllerConf.class);
 
-    
when(_controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH);
-    when(_controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8);
-    when(_controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000);
-    
when(_controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L);
+    
when(controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH);
+    when(controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8);
+    when(controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000);
+    
when(controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L);
 
-    _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _controllerConf);
+    _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, controllerConf);
   }
 
   @Test
   public void testIsDiskUtilizationWithinLimitsNullOrEmptyTableName() {
     Assert.assertThrows(IllegalArgumentException.class,
-        () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(null));
+        () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits(null,
+            UtilizationChecker.CheckPurpose.REALTIME_INGESTION));
     Assert.assertThrows(IllegalArgumentException.class,
-        () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(""));
+        () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits("",
+            UtilizationChecker.CheckPurpose.REALTIME_INGESTION));
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits(null,
+            UtilizationChecker.CheckPurpose.TASK_GENERATION));
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> _diskUtilizationChecker.isResourceUtilizationWithinLimits("",
+            UtilizationChecker.CheckPurpose.TASK_GENERATION));
   }
 
   @Test
@@ -73,16 +80,26 @@ public class DiskUtilizationCheckerTest {
     String tableName = "test_OFFLINE";
     
when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null);
 
-    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    boolean result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertTrue(result);
+
+    result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertTrue(result);
   }
 
   @Test
   public void testIsDiskUtilizationWithinLimitsNonExistentRealtimeTable() {
     String tableName = "test_REALTIME";
-    
when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null);
+    
when(_helixResourceManager.getRealtimeTableConfig(tableName)).thenReturn(null);
 
-    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    boolean result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertTrue(result);
+
+    result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertTrue(result);
   }
 
@@ -107,7 +124,12 @@ public class DiskUtilizationCheckerTest {
     diskUsageInfoMap.put("server2", diskUsageInfo2);
     ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
 
-    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    boolean result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertTrue(result);
+
+    result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertTrue(result);
   }
 
@@ -132,7 +154,12 @@ public class DiskUtilizationCheckerTest {
     diskUsageInfoMap.put("server2", diskUsageInfo2);
     ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
 
-    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    boolean result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertFalse(result);
+
+    result = 
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableName,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertFalse(result);
   }
 
@@ -162,7 +189,7 @@ public class DiskUtilizationCheckerTest {
     when(completionServiceHelper.doMultiGetRequest(anyList(), anyString(), 
anyBoolean(), anyMap(), anyInt(),
         anyString())).thenReturn(serviceResponse);
 
-    
_diskUtilizationChecker.computeDiskUtilization(instanceAdminEndpoints.inverse(),
 completionServiceHelper);
+    
_diskUtilizationChecker.computeResourceUtilization(instanceAdminEndpoints.inverse(),
 completionServiceHelper);
 
     DiskUsageInfo diskUsageInfo1 = 
ResourceUtilizationInfo.getDiskUsageInfo("server1");
     DiskUsageInfo diskUsageInfo2 = 
ResourceUtilizationInfo.getDiskUsageInfo("server2");
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
index 7ca553f2b36..9c435a48e88 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -104,8 +104,8 @@ public class RealtimeSegmentValidationManagerTest {
     when(pauseStatus.getPauseFlag()).thenReturn(isTablePaused);
     when(pauseStatus.getReasonCode()).thenReturn(reasonCode);
     
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName)).thenReturn(pauseStatus);
-    
when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)).thenReturn(
-        !isResourceUtilizationExceeded);
+    
when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName,
+        
UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(!isResourceUtilizationExceeded);
     
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
     
when(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)).thenReturn(isQuotaExceeded);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
index bb61d6e0834..9487105d530 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.validation;
 
+import java.util.List;
 import org.apache.pinot.controller.ControllerConf;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -28,6 +29,7 @@ import org.testng.annotations.Test;
 public class ResourceUtilizationManagerTest {
 
   private DiskUtilizationChecker _diskUtilizationChecker;
+  private List<UtilizationChecker> _utilizationCheckers;
   private ControllerConf _controllerConf;
   private ResourceUtilizationManager _resourceUtilizationManager;
   private final String _testTable = "myTable_OFFLINE";
@@ -35,51 +37,93 @@ public class ResourceUtilizationManagerTest {
   @BeforeMethod
   public void setUp() {
     _diskUtilizationChecker = Mockito.mock(DiskUtilizationChecker.class);
+    _utilizationCheckers = List.of(_diskUtilizationChecker);
     _controllerConf = Mockito.mock(ControllerConf.class);
   }
 
   @Test
   public void testIsResourceUtilizationWithinLimitsWhenCheckIsDisabled() {
     
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(false);
-    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
 
-    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertTrue(result, "Resource utilization should be within limits 
when the check is disabled");
+
+    result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertTrue(result, "Resource utilization should be within limits 
when the check is disabled");
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testIsResourceUtilizationWithinLimitsWithNullTableName() {
     
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
-    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
+
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits(null,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void 
testIsResourceUtilizationWithinLimitsWithNullTableNameIsForMinionTrue() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
 
-    _resourceUtilizationManager.isResourceUtilizationWithinLimits(null);
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits(null,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testIsResourceUtilizationWithinLimitsWithEmptyTableName() {
     
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
-    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
 
-    _resourceUtilizationManager.isResourceUtilizationWithinLimits("");
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits("",
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void 
testIsResourceUtilizationWithinLimitsWithEmptyTableNameIsForMinionTrue() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
+
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits("",
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
   }
 
   @Test
   public void testIsResourceUtilizationWithinLimitsWhenCheckIsEnabled() {
     
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
-    
Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(true);
-    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+    
Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
 
-    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
-    Assert.assertTrue(result, "Resource utilization should be within limits 
when disk check returns true");
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertTrue(result, "Resource utilization should be within limits 
when disk check and primary key count "
+        + "check returns true");
+
+    
Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION)).thenReturn(true);
+    result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
+    Assert.assertTrue(result, "Resource utilization should be within limits 
when disk check and primary key count "
+        + "check returns true");
   }
 
   @Test
   public void testIsResourceUtilizationWithinLimitsWhenCheckFails() {
     
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
-    
Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(false);
-    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+    
Mockito.when(_diskUtilizationChecker.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION)).thenReturn(false);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _utilizationCheckers);
+
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.REALTIME_INGESTION);
+    Assert.assertFalse(result, "Resource utilization should not be within 
limits when disk check returns false");
 
-    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
+    result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable,
+        UtilizationChecker.CheckPurpose.TASK_GENERATION);
     Assert.assertFalse(result, "Resource utilization should not be within 
limits when disk check returns false");
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ab36b37eaaf..e25f8da05e2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -879,6 +879,11 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     return _tableUpsertMetadataManager;
   }
 
+  @VisibleForTesting
+  public TableDedupMetadataManager getTableDedupMetadataManager() {
+    return _tableDedupMetadataManager;
+  }
+
   /**
    * Retrieves a mapping of partition id to the primary key count for the 
partition.
    *
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
index 243509e3473..b8fa0d6a649 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -410,8 +410,6 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     _logger.info("Closed the metadata manager");
   }
 
-  protected abstract long getNumPrimaryKeys();
-
   protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
         numPrimaryKeys);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index 61988fd4ad1..b0ca500c7bd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -123,6 +124,15 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
    */
   protected abstract PartitionDedupMetadataManager 
createPartitionDedupMetadataManager(Integer partitionId);
 
+  @Override
+  public Map<Integer, Long> getPartitionToPrimaryKeyCount() {
+    Map<Integer, Long> partitionToPrimaryKeyCount = new HashMap<>();
+    _partitionMetadataManagerMap.forEach(
+        (partitionID, dedupMetadataManager) -> 
partitionToPrimaryKeyCount.put(partitionID,
+            dedupMetadataManager.getNumPrimaryKeys()));
+    return partitionToPrimaryKeyCount;
+  }
+
   @Override
   public DedupContext getContext() {
     return _context;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index b4ef9ca63ac..5a1c0440bd8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -141,7 +141,7 @@ class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetad
   }
 
   @Override
-  protected long getNumPrimaryKeys() {
+  public long getNumPrimaryKeys() {
     return _primaryKeyToSegmentAndTimeMap.size();
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 8ccd7a51837..f3dece6b6c7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -93,4 +93,6 @@ public interface PartitionDedupMetadataManager extends 
Closeable {
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */
   void stop();
+
+  long getNumPrimaryKeys();
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index 03007c90349..a2d3065d565 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.dedup;
 
 import java.io.Closeable;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
@@ -49,4 +50,11 @@ public interface TableDedupMetadataManager extends Closeable 
{
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */
   void stop();
+
+  /**
+   * Retrieves a mapping of partition id to the primary key count for the 
partition.
+   *
+   * @return A {@code Map} where keys are partition id and values are count of 
primary keys for that specific partition
+   */
+  Map<Integer, Long> getPartitionToPrimaryKeyCount();
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
index 475d4a39297..e02da4408e0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
@@ -79,7 +79,7 @@ public class BasePartitionDedupMetadataManagerTest {
     }
 
     @Override
-    protected long getNumPrimaryKeys() {
+    public long getNumPrimaryKeys() {
       return 0;
     }
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
index a285d074a5b..bfe42c47955 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
@@ -40,13 +40,16 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo;
 import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.server.api.AdminApiApplication;
 import org.apache.pinot.server.starter.ServerInstance;
 
@@ -126,4 +129,23 @@ public class InstanceResource {
     DiskUsageInfo diskUsageInfo = 
DiskUtilization.computeDiskUsage(_instanceId, pathStr);
     return ResourceUtils.convertToJsonString(diskUsageInfo);
   }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/primaryKeyCount")
+  @ApiOperation(value = "Show number of primary keys", notes = "Total number 
of upsert / dedup primary keys")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public String getPrimaryKeyCountInfo(@Context HttpHeaders headers)
+      throws WebApplicationException {
+    // Use InstanceDataManager to fetch details about the number of primary 
keys across all upsert / dedup tables
+    InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
+    if (instanceDataManager == null) {
+      throw new WebApplicationException("Invalid server initialization", 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    PrimaryKeyCountInfo primaryKeyCountInfo =
+        PrimaryKeyCount.computeNumberOfPrimaryKeys(_instanceId, 
instanceDataManager);
+    return ResourceUtils.convertToJsonString(primaryKeyCountInfo);
+  }
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
new file mode 100644
index 00000000000..39cbfa318e8
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PrimaryKeyCount {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PrimaryKeyCount.class);
+
+  private PrimaryKeyCount() {
+  }
+
+  /**
+   * Computes the number of primary keys for this instance
+   */
+  public static PrimaryKeyCountInfo computeNumberOfPrimaryKeys(String 
instanceId,
+      InstanceDataManager instanceDataManager) {
+    if (StringUtils.isEmpty(instanceId)) {
+      throw new IllegalArgumentException("InstanceID cannot be null or empty 
while computing the number of upsert / "
+          + "dedup primary keys.");
+    }
+
+    if (instanceDataManager == null) {
+      throw new IllegalArgumentException("instanceDataManager cannot be null 
while computing the number of upsert / "
+          + "dedup primary keys.");
+    }
+
+    Set<String> allTables = instanceDataManager.getAllTables();
+    long totalPrimaryKeyCount = 0L;
+    Set<String> tablesWithPrimaryKeys = new HashSet<>();
+    for (String tableNameWithType : allTables) {
+      TableDataManager tableDataManager = 
instanceDataManager.getTableDataManager(tableNameWithType);
+      if (tableDataManager == null) {
+        LOGGER.warn("TableDataManager for table: {} is null, skipping", 
tableNameWithType);
+        continue;
+      }
+      if (tableDataManager instanceof RealtimeTableDataManager) {
+        Map<Integer, Long> partitionToPrimaryKeyCount =
+            getPartitionToPrimaryKeyCount((RealtimeTableDataManager) 
tableDataManager);
+
+        if (!partitionToPrimaryKeyCount.isEmpty()) {
+          tablesWithPrimaryKeys.add(tableNameWithType);
+        }
+
+        for (Long numPrimaryKeys : partitionToPrimaryKeyCount.values()) {
+          totalPrimaryKeyCount += numPrimaryKeys == null ? 0 : numPrimaryKeys;
+        }
+      }
+    }
+
+    return new PrimaryKeyCountInfo(instanceId, totalPrimaryKeyCount, 
tablesWithPrimaryKeys, System.currentTimeMillis());
+  }
+
+  private static Map<Integer, Long> getPartitionToPrimaryKeyCount(@NotNull 
RealtimeTableDataManager tableDataManager) {
+    // Fetch the primary key count per partition if either upsert or dedup is 
enabled
+    if (tableDataManager.isUpsertEnabled()) {
+      return 
tableDataManager.getTableUpsertMetadataManager().getPartitionToPrimaryKeyCount();
+    } else if (tableDataManager.isDedupEnabled()) {
+      return 
tableDataManager.getTableDedupMetadataManager().getPartitionToPrimaryKeyCount();
+    }
+    return Map.of();
+  }
+}
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
new file mode 100644
index 00000000000..14ba30e6188
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/PrimaryKeyCountTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.restlet.resources.PrimaryKeyCountInfo;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
+import org.apache.pinot.server.api.resources.PrimaryKeyCount;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class PrimaryKeyCountTest {
+
+  @Test
+  public void testComputePrimaryKeysForEmptyOrNullInstanceId() {
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> PrimaryKeyCount.computeNumberOfPrimaryKeys("", 
instanceDataManager));
+
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> PrimaryKeyCount.computeNumberOfPrimaryKeys(null, 
instanceDataManager));
+  }
+
+  @Test
+  public void testComputePrimaryKeysForNullInstanceDataManager() {
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> PrimaryKeyCount.computeNumberOfPrimaryKeys("instanceId", null));
+  }
+
+
+  @Test
+  public void testComputePrimaryKeysNullTableDataManager() {
+    Set<String> allTables = new HashSet<>();
+    allTables.add("myTable_REALTIME");
+    allTables.add("myTable_OFFLINE");
+
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    
when(instanceDataManager.getTableDataManager("myTable_REALTIME")).thenReturn(null);
+    
when(instanceDataManager.getTableDataManager("myTable_OFFLINE")).thenReturn(null);
+    when(instanceDataManager.getAllTables()).thenReturn(allTables);
+
+    String instanceId = "instance42";
+    PrimaryKeyCountInfo primaryKeyCountInfo = 
PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId,
+        instanceDataManager);
+
+    Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId);
+    Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 0);
+    Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables());
+    Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 
0);
+    Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testComputePrimaryKeysNonUpsertDedupTables() {
+    Set<String> allTables = new HashSet<>();
+    allTables.add("myTable_REALTIME");
+    allTables.add("myTable_OFFLINE");
+
+    // Mock the Table data manager
+    RealtimeTableDataManager realtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(false);
+    when(realtimeTableDataManager.isDedupEnabled()).thenReturn(false);
+    OfflineTableDataManager offlineTableDataManager = 
mock(OfflineTableDataManager.class);
+    
when(realtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(null);
+    
when(realtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(null);
+
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    when(instanceDataManager.getTableDataManager("myTable_REALTIME"))
+        .thenReturn(realtimeTableDataManager);
+    when(instanceDataManager.getTableDataManager("myTable_OFFLINE"))
+        .thenReturn(offlineTableDataManager);
+    when(instanceDataManager.getAllTables()).thenReturn(allTables);
+
+    String instanceId = "instance42";
+    PrimaryKeyCountInfo primaryKeyCountInfo = 
PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId,
+        instanceDataManager);
+
+    Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId);
+    Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 0);
+    Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables());
+    Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 
0);
+    Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testComputePrimaryKeysUpsertOnly() {
+    Set<String> allTables = new HashSet<>();
+    allTables.add("myTable_REALTIME");
+    allTables.add("myTable_OFFLINE");
+
+    Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>();
+    partitionToPrimaryKeyCountMap.put(0, 42L);
+    partitionToPrimaryKeyCountMap.put(1, 420L);
+    partitionToPrimaryKeyCountMap.put(2, 4200L);
+
+    long primaryKeyCount = 0;
+    for (Long count : partitionToPrimaryKeyCountMap.values()) {
+      primaryKeyCount += count;
+    }
+
+    // Mock the Table data manager
+    RealtimeTableDataManager realtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(true);
+    when(realtimeTableDataManager.isDedupEnabled()).thenReturn(false);
+    OfflineTableDataManager offlineTableDataManager = 
mock(OfflineTableDataManager.class);
+
+    // Mock the upsert manager
+    TableUpsertMetadataManager tableUpsertMetadataManager = 
mock(TableUpsertMetadataManager.class);
+    
when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
+    
when(realtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager);
+
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    when(instanceDataManager.getTableDataManager("myTable_REALTIME"))
+        .thenReturn(realtimeTableDataManager);
+    when(instanceDataManager.getTableDataManager("myTable_OFFLINE"))
+        .thenReturn(offlineTableDataManager);
+    when(instanceDataManager.getAllTables()).thenReturn(allTables);
+
+    String instanceId = "instance42";
+    PrimaryKeyCountInfo primaryKeyCountInfo = 
PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId,
+        instanceDataManager);
+
+    Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId);
+    Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 
primaryKeyCount);
+    Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables());
+    Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 
1);
+    
Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTable_REALTIME"));
+    Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testComputePrimaryKeysDedupOnly() {
+    Set<String> allTables = new HashSet<>();
+    allTables.add("myTable_REALTIME");
+    allTables.add("myTable_OFFLINE");
+
+    Map<Integer, Long> partitionToPrimaryKeyCountMap = new HashMap<>();
+    partitionToPrimaryKeyCountMap.put(0, 42L);
+    partitionToPrimaryKeyCountMap.put(1, 420L);
+    partitionToPrimaryKeyCountMap.put(2, 4200L);
+
+    long primaryKeyCount = 0;
+    for (Long count : partitionToPrimaryKeyCountMap.values()) {
+      primaryKeyCount += count;
+    }
+
+    // Mock the Table data manager
+    RealtimeTableDataManager realtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(realtimeTableDataManager.isUpsertEnabled()).thenReturn(false);
+    when(realtimeTableDataManager.isDedupEnabled()).thenReturn(true);
+    OfflineTableDataManager offlineTableDataManager = 
mock(OfflineTableDataManager.class);
+
+    // Mock the dedup manager
+    TableDedupMetadataManager tableDedupMetadataManager = 
mock(TableDedupMetadataManager.class);
+    
when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(partitionToPrimaryKeyCountMap);
+    
when(realtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager);
+
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    when(instanceDataManager.getTableDataManager("myTable_REALTIME"))
+        .thenReturn(realtimeTableDataManager);
+    when(instanceDataManager.getTableDataManager("myTable_OFFLINE"))
+        .thenReturn(offlineTableDataManager);
+    when(instanceDataManager.getAllTables()).thenReturn(allTables);
+
+    String instanceId = "instance42";
+    PrimaryKeyCountInfo primaryKeyCountInfo = 
PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId,
+        instanceDataManager);
+
+    Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId);
+    Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 
primaryKeyCount);
+    Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables());
+    Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 
1);
+    
Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTable_REALTIME"));
+    Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testComputePrimaryKeysUpsertAndDedup() {
+    Set<String> allTables = new HashSet<>();
+    allTables.add("myTableUpsert_REALTIME");
+    allTables.add("myTableDedup_REALTIME");
+    allTables.add("myTable_OFFLINE");
+
+    Map<Integer, Long> upsertPartitionToPrimaryKeyCountMap = new HashMap<>();
+    upsertPartitionToPrimaryKeyCountMap.put(0, 42L);
+    upsertPartitionToPrimaryKeyCountMap.put(1, 420L);
+    upsertPartitionToPrimaryKeyCountMap.put(2, 4200L);
+
+    Map<Integer, Long> dedupPartitionToPrimaryKeyCountMap = new HashMap<>();
+    dedupPartitionToPrimaryKeyCountMap.put(0, 2042L);
+    dedupPartitionToPrimaryKeyCountMap.put(1, 777L);
+
+    long primaryKeyCount = 0;
+    for (Long count : upsertPartitionToPrimaryKeyCountMap.values()) {
+      primaryKeyCount += count;
+    }
+    for (Long count : dedupPartitionToPrimaryKeyCountMap.values()) {
+      primaryKeyCount += count;
+    }
+
+    // Mock the Table data manager
+    RealtimeTableDataManager upsertRealtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(upsertRealtimeTableDataManager.isUpsertEnabled()).thenReturn(true);
+    when(upsertRealtimeTableDataManager.isDedupEnabled()).thenReturn(false);
+    RealtimeTableDataManager dedupRealtimeTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(dedupRealtimeTableDataManager.isUpsertEnabled()).thenReturn(false);
+    when(dedupRealtimeTableDataManager.isDedupEnabled()).thenReturn(true);
+    OfflineTableDataManager offlineTableDataManager = 
mock(OfflineTableDataManager.class);
+
+    // Mock the upsert manager
+    TableUpsertMetadataManager tableUpsertMetadataManager = 
mock(TableUpsertMetadataManager.class);
+    
when(tableUpsertMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(upsertPartitionToPrimaryKeyCountMap);
+    
when(upsertRealtimeTableDataManager.getTableUpsertMetadataManager()).thenReturn(tableUpsertMetadataManager);
+
+    // Mock the dedup manager
+    TableDedupMetadataManager tableDedupMetadataManager = 
mock(TableDedupMetadataManager.class);
+    
when(tableDedupMetadataManager.getPartitionToPrimaryKeyCount()).thenReturn(dedupPartitionToPrimaryKeyCountMap);
+    
when(dedupRealtimeTableDataManager.getTableDedupMetadataManager()).thenReturn(tableDedupMetadataManager);
+
+    // Mock the instance data manager
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    when(instanceDataManager.getTableDataManager("myTableUpsert_REALTIME"))
+        .thenReturn(upsertRealtimeTableDataManager);
+    when(instanceDataManager.getTableDataManager("myTableDedup_REALTIME"))
+        .thenReturn(dedupRealtimeTableDataManager);
+    when(instanceDataManager.getTableDataManager("myTable_OFFLINE"))
+        .thenReturn(offlineTableDataManager);
+    when(instanceDataManager.getAllTables()).thenReturn(allTables);
+
+    String instanceId = "instance42";
+    PrimaryKeyCountInfo primaryKeyCountInfo = 
PrimaryKeyCount.computeNumberOfPrimaryKeys(instanceId,
+        instanceDataManager);
+
+    Assert.assertEquals(primaryKeyCountInfo.getInstanceId(), instanceId);
+    Assert.assertEquals(primaryKeyCountInfo.getNumPrimaryKeys(), 
primaryKeyCount);
+    Assert.assertNotNull(primaryKeyCountInfo.getUpsertAndDedupTables());
+    Assert.assertEquals(primaryKeyCountInfo.getUpsertAndDedupTables().size(), 
2);
+    
Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTableUpsert_REALTIME"));
+    
Assert.assertTrue(primaryKeyCountInfo.getUpsertAndDedupTables().contains("myTableDedup_REALTIME"));
+    Assert.assertTrue(primaryKeyCountInfo.getLastUpdatedTimeInEpochMs() <= 
System.currentTimeMillis());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to