This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 00b5a73b20 Keep get tables API with and without database (#12804)
00b5a73b20 is described below

commit 00b5a73b20415aaea0f9bb4f44c07ad21f563193
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Apr 8 01:10:31 2024 -0700

    Keep get tables API with and without database (#12804)
---
 .../pinot/controller/BaseControllerStarter.java    | 125 +++---
 .../api/resources/PinotBrokerRestletResource.java  |  13 +-
 .../PinotLeadControllerRestletResource.java        |   3 +-
 .../helix/core/PinotHelixResourceManager.java      |  79 ++--
 .../core/cleanup/StaleInstancesCleanupTask.java    |   9 +-
 .../helix/core/minion/PinotTaskManager.java        |  25 +-
 .../core/periodictask/ControllerPeriodicTask.java  |   9 +-
 .../helix/RealtimeConsumerMonitorTest.java         |  85 ++---
 .../controller/helix/SegmentStatusCheckerTest.java | 425 ++++++++++-----------
 .../periodictask/ControllerPeriodicTaskTest.java   |  20 +-
 .../helix/core/retention/RetentionManagerTest.java |  71 ++--
 11 files changed, 376 insertions(+), 488 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f78a49f2f9..91df2e0743 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -201,8 +201,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _helixClusterName = _config.getHelixClusterName();
     ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, 
_helixClusterName, ServiceRole.CONTROLLER);
 
-    PinotInsecureMode.setPinotInInsecureMode(
-        
Boolean.valueOf(_config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
+    PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf(
+        _config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
             CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
 
     setupHelixSystemProperties();
@@ -531,8 +531,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       if (tableConfig != null) {
         Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
         try {
-          StreamConfig.validateConsumerType(
-              streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, 
"kafka"), streamConfigMap);
+          
StreamConfig.validateConsumerType(streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE,
 "kafka"),
+              streamConfigMap);
         } catch (Exception e) {
           existingHlcTables.add(rt);
         }
@@ -587,66 +587,63 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixResourceManager.getPropertyStore();
 
-    _helixResourceManager.getDatabaseNames().stream()
-        .map(_helixResourceManager::getAllTables)
-        .flatMap(List::stream)
-        .forEach(tableNameWithType -> {
-          Pair<TableConfig, Integer> tableConfigWithVersion =
-              ZKMetadataProvider.getTableConfigWithVersion(propertyStore, 
tableNameWithType);
-          if (tableConfigWithVersion == null) {
-            // This might due to table deletion, just log it here.
-            LOGGER.warn("Failed to find table config for table: {}, the table 
likely already got deleted",
-                tableNameWithType);
-            return;
-          }
-          TableConfig tableConfig = tableConfigWithVersion.getLeft();
-          String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-          String schemaPath = 
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
-          boolean schemaExists = propertyStore.exists(schemaPath, 
AccessOption.PERSISTENT);
-          String existSchemaName = 
tableConfig.getValidationConfig().getSchemaName();
-          if (existSchemaName == null || existSchemaName.equals(rawTableName)) 
{
-            // Although the table config is valid, we still need to ensure the 
schema exists
-            if (!schemaExists) {
-              LOGGER.warn("Failed to find schema for table: {}", 
tableNameWithType);
-              tableWithoutSchemaCount.getAndIncrement();
-              return;
-            }
-            // Table config is already in good status
-            return;
-          }
-          misconfiguredTableCount.getAndIncrement();
-          if (schemaExists) {
-            // If a schema named `rawTableName` already exists, then likely 
this is a misconfiguration.
-            // Reset schema name in table config to null to let the table 
point to the existing schema.
-            LOGGER.warn("Schema: {} already exists, fix the schema name in 
table config from {} to null", rawTableName,
-                existSchemaName);
-          } else {
-            // Copy the schema current table referring to to `rawTableName` if 
it does not exist
-            Schema schema = _helixResourceManager.getSchema(existSchemaName);
-            if (schema == null) {
-              LOGGER.warn("Failed to find schema: {} for table: {}", 
existSchemaName, tableNameWithType);
-              tableWithoutSchemaCount.getAndIncrement();
-              return;
-            }
-            schema.setSchemaName(rawTableName);
-            if (propertyStore.create(schemaPath, 
SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT)) {
-              LOGGER.info("Copied schema: {} to {}", existSchemaName, 
rawTableName);
-            } else {
-              LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, 
rawTableName);
-              failedToCopySchemaCount.getAndIncrement();
-              return;
-            }
-          }
-          // Update table config to remove schema name
-          tableConfig.getValidationConfig().setSchemaName(null);
-          if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, 
tableConfigWithVersion.getRight())) {
-            LOGGER.info("Removed schema name from table config for table: {}", 
tableNameWithType);
-            fixedSchemaTableCount.getAndIncrement();
-          } else {
-            LOGGER.warn("Failed to update table config for table: {}", 
tableNameWithType);
-            failedToUpdateTableConfigCount.getAndIncrement();
-          }
-        });
+    _helixResourceManager.getAllTables().forEach(tableNameWithType -> {
+      Pair<TableConfig, Integer> tableConfigWithVersion =
+          ZKMetadataProvider.getTableConfigWithVersion(propertyStore, 
tableNameWithType);
+      if (tableConfigWithVersion == null) {
+        // This might due to table deletion, just log it here.
+        LOGGER.warn("Failed to find table config for table: {}, the table 
likely already got deleted",
+            tableNameWithType);
+        return;
+      }
+      TableConfig tableConfig = tableConfigWithVersion.getLeft();
+      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+      String schemaPath = 
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
+      boolean schemaExists = propertyStore.exists(schemaPath, 
AccessOption.PERSISTENT);
+      String existSchemaName = 
tableConfig.getValidationConfig().getSchemaName();
+      if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
+        // Although the table config is valid, we still need to ensure the 
schema exists
+        if (!schemaExists) {
+          LOGGER.warn("Failed to find schema for table: {}", 
tableNameWithType);
+          tableWithoutSchemaCount.getAndIncrement();
+          return;
+        }
+        // Table config is already in good status
+        return;
+      }
+      misconfiguredTableCount.getAndIncrement();
+      if (schemaExists) {
+        // If a schema named `rawTableName` already exists, then likely this 
is a misconfiguration.
+        // Reset schema name in table config to null to let the table point to 
the existing schema.
+        LOGGER.warn("Schema: {} already exists, fix the schema name in table 
config from {} to null", rawTableName,
+            existSchemaName);
+      } else {
+        // Copy the schema current table referring to to `rawTableName` if it 
does not exist
+        Schema schema = _helixResourceManager.getSchema(existSchemaName);
+        if (schema == null) {
+          LOGGER.warn("Failed to find schema: {} for table: {}", 
existSchemaName, tableNameWithType);
+          tableWithoutSchemaCount.getAndIncrement();
+          return;
+        }
+        schema.setSchemaName(rawTableName);
+        if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema), 
AccessOption.PERSISTENT)) {
+          LOGGER.info("Copied schema: {} to {}", existSchemaName, 
rawTableName);
+        } else {
+          LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName, 
rawTableName);
+          failedToCopySchemaCount.getAndIncrement();
+          return;
+        }
+      }
+      // Update table config to remove schema name
+      tableConfig.getValidationConfig().setSchemaName(null);
+      if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig, 
tableConfigWithVersion.getRight())) {
+        LOGGER.info("Removed schema name from table config for table: {}", 
tableNameWithType);
+        fixedSchemaTableCount.getAndIncrement();
+      } else {
+        LOGGER.warn("Failed to update table config for table: {}", 
tableNameWithType);
+        failedToUpdateTableConfigCount.getAndIncrement();
+      }
+    });
     LOGGER.info(
         "Found {} tables misconfigured, {} tables without schema. Successfully 
fixed schema for {} tables, failed to "
             + "fix {} tables due to copy schema failure, failed to fix {} 
tables due to update table config failure.",
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
index 1863819ab9..4fdca71a82 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -127,10 +127,8 @@ public class PinotBrokerRestletResource {
   @ApiOperation(value = "List tables to brokers mappings", notes = "List 
tables to brokers mappings")
   public Map<String, List<String>> getTablesToBrokersMapping(
       @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state, 
@Context HttpHeaders headers) {
-    Map<String, List<String>> resultMap = new HashMap<>();
-    
_pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE))
-        .forEach(table -> resultMap.put(table, getBrokersForTable(table, null, 
state, headers)));
-    return resultMap;
+    return 
_pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE)).stream()
+        .collect(Collectors.toMap(table -> table, table -> 
getBrokersForTable(table, null, state, headers)));
   }
 
   @GET
@@ -201,11 +199,8 @@ public class PinotBrokerRestletResource {
   @ApiOperation(value = "List tables to brokers mappings", notes = "List 
tables to brokers mappings")
   public Map<String, List<InstanceInfo>> getTablesToBrokersMappingV2(
       @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state, 
@Context HttpHeaders headers) {
-    Map<String, List<InstanceInfo>> resultMap = new HashMap<>();
-    String databaseName = headers.getHeaderString(DATABASE);
-    _pinotHelixResourceManager.getAllRawTables(databaseName).stream()
-        .forEach(table -> resultMap.put(table, getBrokersForTableV2(table, 
null, state, headers)));
-    return resultMap;
+    return 
_pinotHelixResourceManager.getAllRawTables(headers.getHeaderString(DATABASE)).stream()
+        .collect(Collectors.toMap(table -> table, table -> 
getBrokersForTableV2(table, null, state, headers)));
   }
 
   @GET
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java
index 5f09f74336..16f13e0471 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLeadControllerRestletResource.java
@@ -109,8 +109,7 @@ public class PinotLeadControllerRestletResource {
     }
 
     // Assigns all the tables to the relevant partitions.
-    List<String> tableNames = _pinotHelixResourceManager.getAllTables(
-        headers.getHeaderString(DATABASE));
+    List<String> tableNames = 
_pinotHelixResourceManager.getAllTables(headers.getHeaderString(DATABASE));
     for (String tableName : tableNames) {
       String rawTableName = TableNameBuilder.extractRawTableName(tableName);
       int partitionId = 
LeadControllerUtils.getPartitionIdForTable(rawTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f1761ce866..680e475d63 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -716,13 +716,12 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Get all table names (with type suffix) in default database.
+   * Get all table names (with type suffix) in all databases.
    *
-   * @return List of table names in default database
+   * @return List of table names
    */
-  @Deprecated
   public List<String> getAllTables() {
-    return getAllTables(null);
+    return 
getAllResources().stream().filter(TableNameBuilder::isTableResource).collect(Collectors.toList());
   }
 
   /**
@@ -732,23 +731,18 @@ public class PinotHelixResourceManager {
    * @return List of table names in provided database name
    */
   public List<String> getAllTables(@Nullable String databaseName) {
-    List<String> tableNames = new ArrayList<>();
-    for (String resourceName : getAllResources()) {
-      if (TableNameBuilder.isTableResource(resourceName)
-          && DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
-        tableNames.add(resourceName);
-      }
-    }
-    return tableNames;
+    return getAllResources().stream().filter(
+        resourceName -> TableNameBuilder.isTableResource(resourceName) && 
DatabaseUtils.isPartOfDatabase(resourceName,
+            databaseName)).collect(Collectors.toList());
   }
 
   /**
-   * Get all offline table names from default database.
+   * Get all offline table names from all databases.
    *
-   * @return List of offline table names in default database
+   * @return List of offline table names
    */
   public List<String> getAllOfflineTables() {
-    return getAllOfflineTables(null);
+    return 
getAllResources().stream().filter(TableNameBuilder::isOfflineTableResource).collect(Collectors.toList());
   }
 
   /**
@@ -758,23 +752,18 @@ public class PinotHelixResourceManager {
    * @return List of offline table names in provided database name
    */
   public List<String> getAllOfflineTables(@Nullable String databaseName) {
-    List<String> offlineTableNames = new ArrayList<>();
-    for (String resourceName : getAllResources()) {
-      if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
-          && TableNameBuilder.isOfflineTableResource(resourceName)) {
-        offlineTableNames.add(resourceName);
-      }
-    }
-    return offlineTableNames;
+    return getAllResources().stream().filter(
+        resourceName -> TableNameBuilder.isOfflineTableResource(resourceName) 
&& DatabaseUtils.isPartOfDatabase(
+            resourceName, databaseName)).collect(Collectors.toList());
   }
 
   /**
-   * Get all dimension table names from default database.
+   * Get all dimension table names from all databases.
    *
-   * @return List of dimension table names in default database
+   * @return List of dimension table names
    */
   public List<String> getAllDimensionTables() {
-    return getAllDimensionTables(null);
+    return _tableCache.getAllDimensionTables();
   }
 
   /**
@@ -785,17 +774,16 @@ public class PinotHelixResourceManager {
    */
   public List<String> getAllDimensionTables(@Nullable String databaseName) {
     return _tableCache.getAllDimensionTables().stream()
-        .filter(table -> DatabaseUtils.isPartOfDatabase(table, databaseName))
-        .collect(Collectors.toList());
+        .filter(table -> DatabaseUtils.isPartOfDatabase(table, 
databaseName)).collect(Collectors.toList());
   }
 
   /**
-   * Get all realtime table names from default database.
+   * Get all realtime table names from all databases.
    *
-   * @return List of realtime table names in default database
+   * @return List of realtime table names
    */
   public List<String> getAllRealtimeTables() {
-    return getAllRealtimeTables(null);
+    return 
getAllResources().stream().filter(TableNameBuilder::isRealtimeTableResource).collect(Collectors.toList());
   }
 
   /**
@@ -805,23 +793,19 @@ public class PinotHelixResourceManager {
    * @return List of realtime table names in provided database name
    */
   public List<String> getAllRealtimeTables(@Nullable String databaseName) {
-    List<String> realtimeTableNames = new ArrayList<>();
-    for (String resourceName : getAllResources()) {
-      if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
-          && TableNameBuilder.isRealtimeTableResource(resourceName)) {
-        realtimeTableNames.add(resourceName);
-      }
-    }
-    return realtimeTableNames;
+    return getAllResources().stream().filter(
+        resourceName -> TableNameBuilder.isRealtimeTableResource(resourceName) 
&& DatabaseUtils.isPartOfDatabase(
+            resourceName, databaseName)).collect(Collectors.toList());
   }
 
   /**
-   * Get all raw table names in default database.
+   * Get all raw table names in all databases.
    *
-   * @return List of raw table names in default database
+   * @return List of raw table names
    */
   public List<String> getAllRawTables() {
-    return getAllRawTables(null);
+    return getAllResources().stream().filter(TableNameBuilder::isTableResource)
+        
.map(TableNameBuilder::extractRawTableName).distinct().collect(Collectors.toList());
   }
 
   /**
@@ -831,14 +815,9 @@ public class PinotHelixResourceManager {
    * @return List of raw table names in provided database name
    */
   public List<String> getAllRawTables(@Nullable String databaseName) {
-    Set<String> rawTableNames = new HashSet<>();
-    for (String resourceName : getAllResources()) {
-      if (TableNameBuilder.isTableResource(resourceName)
-          && DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
-        rawTableNames.add(TableNameBuilder.extractRawTableName(resourceName));
-      }
-    }
-    return new ArrayList<>(rawTableNames);
+    return getAllResources().stream().filter(
+        resourceName -> TableNameBuilder.isTableResource(resourceName) && 
DatabaseUtils.isPartOfDatabase(resourceName,
+            
databaseName)).map(TableNameBuilder::extractRawTableName).distinct().collect(Collectors.toList());
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
index 027712bde8..b257462985 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTask.java
@@ -138,12 +138,9 @@ public class StaleInstancesCleanupTask extends 
BasePeriodicTask {
 
   private Set<String> getServerInstancesInUse() {
     Set<String> serverInstancesInUse = new HashSet<>();
-    _pinotHelixResourceManager.getDatabaseNames().stream()
-        .map(_pinotHelixResourceManager::getAllTables)
-        .flatMap(List::stream)
-        .forEach(tableName -> serverInstancesInUse.addAll(
-          
Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName))
-              .map(is -> 
is.getInstanceSet(tableName)).orElse(Collections.emptySet())));
+    _pinotHelixResourceManager.getAllTables().forEach(tableName -> 
serverInstancesInUse.addAll(
+        
Optional.ofNullable(_pinotHelixResourceManager.getTableIdealState(tableName))
+            .map(is -> 
is.getInstanceSet(tableName)).orElse(Collections.emptySet())));
     return serverInstancesInUse;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 9b2fced8c2..2cdbf8c1df 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -480,13 +480,12 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   }
 
   /**
-   * Public API to schedule tasks (all task types) for all tables in default 
database.
+   * Public API to schedule tasks (all task types) for all tables in all 
databases.
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  @Deprecated
   public synchronized Map<String, List<String>> scheduleTasks() {
-    return 
scheduleTasks(_pinotHelixResourceManager.getAllTables(CommonConstants.DEFAULT_DATABASE),
 false);
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false);
   }
 
   /**
@@ -494,7 +493,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> 
scheduleTasksForDatabase(String database) {
+  public synchronized Map<String, List<String>> 
scheduleTasksForDatabase(@Nullable String database) {
     return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), 
false);
   }
 
@@ -605,8 +604,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
         }
       } catch (Exception e) {
         numErrorTasksScheduled++;
-        LOGGER.error("Failed to schedule task type {} on minion instance {} 
with task configs: {}",
-            taskType, minionInstanceTag, pinotTaskConfigs, e);
+        LOGGER.error("Failed to schedule task type {} on minion instance {} 
with task configs: {}", taskType,
+            minionInstanceTag, pinotTaskConfigs, e);
       }
     }
     if (numErrorTasksScheduled > 0) {
@@ -629,14 +628,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   }
 
   /**
-   * Public API to schedule task for the given task type in default database.
+   * Public API to schedule task for the given task type in all databases.
    * It might be called from the non-leader controller.
    * Returns the list of task names, or {@code null} if no task is scheduled.
    */
-  @Deprecated
   @Nullable
   public synchronized List<String> scheduleTask(String taskType) {
-    return scheduleTaskForDatabase(taskType, CommonConstants.DEFAULT_DATABASE);
+    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables());
   }
 
   /**
@@ -645,13 +643,18 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * Returns the list of task name, or {@code null} if no task is scheduled.
    */
   @Nullable
-  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
String database) {
+  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
@Nullable String database) {
+    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database));
+  }
+
+  @Nullable
+  private List<String> scheduleTask(String taskType, List<String> tables) {
     PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
     Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
 
     // Scan all table configs to get the tables with task enabled
     List<TableConfig> enabledTableConfigs = new ArrayList<>();
-    for (String tableNameWithType : 
_pinotHelixResourceManager.getAllTables(database)) {
+    for (String tableNameWithType : tables) {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
       if (tableConfig != null && tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig()
           .isTaskTypeEnabled(taskType)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 47dc218f43..d3584df068 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.periodictask;
 
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -69,12 +68,8 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
       // Check if we have a specific table against which this task needs to be 
run.
       String propTableNameWithType = (String) 
periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
       // Process the tables that are managed by this controller
-      List<String> allTables = propTableNameWithType == null
-          ? _pinotHelixResourceManager.getDatabaseNames().stream()
-            .map(_pinotHelixResourceManager::getAllTables)
-            .flatMap(List::stream)
-            .collect(Collectors.toList())
-          : Collections.singletonList(propTableNameWithType);
+      List<String> allTables =
+          propTableNameWithType != null ? List.of(propTableNameWithType) : 
_pinotHelixResourceManager.getAllTables();
 
       Set<String> currentLeaderOfTables = allTables.stream()
           .filter(_leadControllerManager::isLeaderForTable)
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
index 051fd784b6..26928deb3c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pinot.controller.helix;
 
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -43,15 +40,14 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 
 
 public class RealtimeConsumerMonitorTest {
@@ -59,17 +55,16 @@ public class RealtimeConsumerMonitorTest {
   @Test
   public void realtimeBasicTest()
       throws Exception {
-    final String tableName = "myTable_REALTIME";
-    final String rawTableName = 
TableNameBuilder.extractRawTableName(tableName);
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
+    String rawTableName = "myTable";
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn")
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn")
             .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build();
+
     LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 
0, System.currentTimeMillis());
     LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 
1, System.currentTimeMillis());
     LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 
0, System.currentTimeMillis());
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(realtimeTableName);
     idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), 
"pinot1", "ONLINE");
     idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), 
"pinot2", "ONLINE");
     idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), 
"pinot1", "CONSUMING");
@@ -79,7 +74,7 @@ public class RealtimeConsumerMonitorTest {
     idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(tableName);
+    ExternalView externalView = new ExternalView(realtimeTableName);
     externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", 
"ONLINE");
     externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", 
"ONLINE");
     externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", 
"CONSUMING");
@@ -91,13 +86,11 @@ public class RealtimeConsumerMonitorTest {
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
       ZkHelixPropertyStore<ZNRecord> helixPropertyStore = 
mock(ZkHelixPropertyStore.class);
-      
when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+      
when(helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig);
       
when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore);
-      when(helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      
when(helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
+      
when(helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView);
       ZNRecord znRecord = new ZNRecord("0");
       znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
       when(helixPropertyStore.get(anyString(), any(), 
anyInt())).thenReturn(znRecord);
@@ -121,61 +114,53 @@ public class RealtimeConsumerMonitorTest {
     // So, the consumer monitor should show: 1. partition-1 has 0 lag; 
partition-2 has some non-zero lag.
     // Segment 1 in replicas:
     TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> 
response = new TreeMap<>();
-    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part1ServerConsumingSegmentInfo = new ArrayList<>(2);
-    part1ServerConsumingSegmentInfo.add(
-        getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"));
-    part1ServerConsumingSegmentInfo.add(
-        getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0"));
-
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part1ServerConsumingSegmentInfo =
+        List.of(getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", 
"0"),
+            getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", 
"0"));
     response.put(segmentPartition1Seq1.getSegmentName(), 
part1ServerConsumingSegmentInfo);
 
     // Segment 2 in replicas
-    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part2ServerConsumingSegmentInfo = new ArrayList<>(2);
-    part2ServerConsumingSegmentInfo.add(
-        getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"));
-    part2ServerConsumingSegmentInfo.add(
-        getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000"));
-
+    List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
part2ServerConsumingSegmentInfo =
+        List.of(getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", 
"0"),
+            getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", 
"60000"));
     response.put(segmentPartition2Seq0.getSegmentName(), 
part2ServerConsumingSegmentInfo);
 
     ConsumingSegmentInfoReader consumingSegmentReader = 
mock(ConsumingSegmentInfoReader.class);
-    when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
-        .thenReturn(new 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
+    when(consumingSegmentReader.getConsumingSegmentsInfo(realtimeTableName, 
10000)).thenReturn(
+        new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 
0));
     RealtimeConsumerMonitor realtimeConsumerMonitor =
-        new RealtimeConsumerMonitor(config, helixResourceManager, 
leadControllerManager,
-            controllerMetrics, consumingSegmentReader);
+        new RealtimeConsumerMonitor(config, helixResourceManager, 
leadControllerManager, controllerMetrics,
+            consumingSegmentReader);
     realtimeConsumerMonitor.start();
     realtimeConsumerMonitor.run();
-    
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
tableName, 1,
+
+    assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
realtimeTableName, 1,
         ControllerGauge.MAX_RECORDS_LAG), 0);
-    
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
tableName, 2,
+    assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
realtimeTableName, 2,
         ControllerGauge.MAX_RECORDS_LAG), 40);
-    
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
tableName, 1,
-            ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
-    
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
tableName, 2,
+    assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
realtimeTableName, 1,
+        ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
+    assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, 
realtimeTableName, 2,
         ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000);
   }
 
   ConsumingSegmentInfoReader.ConsumingSegmentInfo 
getConsumingSegmentInfoForServer(String serverName,
       String partitionId, String currentOffset, String upstreamLatestOffset, 
String availabilityLagMs) {
-    Map<String, String> currentOffsetMap = 
Collections.singletonMap(partitionId, currentOffset);
-    Map<String, String> latestUpstreamOffsetMap = 
Collections.singletonMap(partitionId, upstreamLatestOffset);
-    Map<String, String> recordsLagMap = Collections.singletonMap(partitionId, 
String.valueOf(
-        Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset)));
-    Map<String, String> availabilityLagMsMap = 
Collections.singletonMap(partitionId, availabilityLagMs);
+    Map<String, String> currentOffsetMap = Map.of(partitionId, currentOffset);
+    Map<String, String> latestUpstreamOffsetMap = Map.of(partitionId, 
upstreamLatestOffset);
+    Map<String, String> recordsLagMap =
+        Map.of(partitionId, 
String.valueOf(Long.parseLong(upstreamLatestOffset) - 
Long.parseLong(currentOffset)));
+    Map<String, String> availabilityLagMsMap = Map.of(partitionId, 
availabilityLagMs);
 
     ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo =
         new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, 
latestUpstreamOffsetMap, recordsLagMap,
             availabilityLagMsMap);
-    return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, 
"CONSUMING", -1,
-        currentOffsetMap, partitionOffsetInfo);
+    return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, 
"CONSUMING", -1, currentOffsetMap,
+        partitionOffsetInfo);
   }
 
   Map<String, String> getStreamConfigMap() {
-    return ImmutableMap.of(
-        "streamType", "kafka",
-        "stream.kafka.consumer.type", "simple",
-        "stream.kafka.topic.name", "test",
+    return Map.of("streamType", "kafka", "stream.kafka.consumer.type", 
"simple", "stream.kafka.topic.name", "test",
         "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
         "stream.kafka.consumer.factory.class.name",
         
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index a1dd8f2697..3161c9da20 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pinot.controller.helix;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -52,19 +48,21 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 
 
 public class SegmentStatusCheckerTest {
+  private final ExecutorService _executorService = 
Executors.newFixedThreadPool(1);
+
   private SegmentStatusChecker _segmentStatusChecker;
   private PinotHelixResourceManager _helixResourceManager;
   private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
@@ -73,18 +71,15 @@ public class SegmentStatusCheckerTest {
   private ControllerMetrics _controllerMetrics;
   private ControllerConf _config;
   private TableSizeReader _tableSizeReader;
-  private ExecutorService _executorService = Executors.newFixedThreadPool(1);
 
   @Test
   public void offlineBasicTest()
       throws Exception {
-    final String tableName = "myTable_OFFLINE";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
+    String offlineTableName = "myTable_OFFLINE";
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(2).build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build();
 
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(offlineTableName);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -101,7 +96,7 @@ public class SegmentStatusCheckerTest {
     idealState.setReplicas("2");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(tableName);
+    ExternalView externalView = new ExternalView(offlineTableName);
     externalView.setState("myTable_0", "pinot1", "ONLINE");
     externalView.setState("myTable_0", "pinot2", "ONLINE");
     externalView.setState("myTable_1", "pinot1", "ERROR");
@@ -114,27 +109,23 @@ public class SegmentStatusCheckerTest {
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
+      
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
+      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
     }
     {
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
       // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, 
{myTable_3 -> myTable_4, IN_PROGRESS},
       // myTable_1 and myTable_4 will be skipped for the metrics.
-      SegmentLineage segmentLineage = new SegmentLineage(tableName);
+      SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
       
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
-          new LineageEntry(Collections.singletonList("myTable_1"), 
Collections.singletonList("myTable_3"),
-              LineageEntryState.COMPLETED, 11111L));
+          new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), 
LineageEntryState.COMPLETED, 11111L));
       
segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(),
-          new LineageEntry(Collections.singletonList("myTable_3"), 
Collections.singletonList("myTable_4"),
-              LineageEntryState.IN_PROGRESS, 11111L));
-      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), 
eq(AccessOption.PERSISTENT)))
-          .thenReturn(segmentLineage.toZNRecord());
+          new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), 
LineageEntryState.IN_PROGRESS, 11111L));
+      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), 
any(),
+          
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
     }
     {
       _config = mock(ControllerConf.class);
@@ -158,40 +149,41 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
offlineTableName,
         ControllerGauge.REPLICATION_FROM_CONFIG), 2);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENT_COUNT), 3);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(), ControllerGauge.SEGMENT_COUNT),
+        3);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.NUMBER_OF_REPLICAS), 2);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_OF_REPLICAS), 66);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.NUMBER_OF_REPLICAS), 2);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_OF_REPLICAS), 66);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
   public void realtimeBasicTest()
       throws Exception {
-    final String tableName = "myTable_REALTIME";
-    final String rawTableName = 
TableNameBuilder.extractRawTableName(tableName);
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
+    String rawTableName = "myTable";
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn")
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn")
             .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build();
-    final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, 
System.currentTimeMillis());
-    final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, 
System.currentTimeMillis());
-    final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, 
System.currentTimeMillis());
-    IdealState idealState = new IdealState(tableName);
+
+    LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, 
System.currentTimeMillis());
+    LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, 
System.currentTimeMillis());
+    LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, 
System.currentTimeMillis());
+    IdealState idealState = new IdealState(realtimeTableName);
     idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE");
     idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE");
     idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE");
@@ -204,7 +196,7 @@ public class SegmentStatusCheckerTest {
     idealState.setReplicas("3");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(tableName);
+    ExternalView externalView = new ExternalView(realtimeTableName);
     externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE");
     externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE");
     externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE");
@@ -218,13 +210,11 @@ public class SegmentStatusCheckerTest {
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
-      
when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+      
when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView);
       ZNRecord znRecord = new ZNRecord("0");
       znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, 
"10000");
       when(_helixPropertyStore.get(anyString(), any(), 
anyInt())).thenReturn(znRecord);
@@ -251,27 +241,25 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.REPLICATION_FROM_CONFIG), 3);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.REPLICATION_FROM_CONFIG), 3);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.NUMBER_OF_REPLICAS), 3);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.NUMBER_OF_REPLICAS), 3);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_OF_REPLICAS), 100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2);
   }
 
   Map<String, String> getStreamConfigMap() {
-    return ImmutableMap.of(
-        "streamType", "kafka",
-        "stream.kafka.consumer.type", "simple",
-        "stream.kafka.topic.name", "test",
+    return Map.of("streamType", "kafka", "stream.kafka.consumer.type", 
"simple", "stream.kafka.topic.name", "test",
         "stream.kafka.decoder.class.name", 
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
         "stream.kafka.consumer.factory.class.name",
         
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
@@ -281,8 +269,7 @@ public class SegmentStatusCheckerTest {
   public void missingEVPartitionTest()
       throws Exception {
     String offlineTableName = "myTable_OFFLINE";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(offlineTableName);
+
     IdealState idealState = new IdealState(offlineTableName);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
@@ -317,21 +304,19 @@ public class SegmentStatusCheckerTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore;
     {
       propertyStore = (ZkHelixPropertyStore<ZNRecord>) 
mock(ZkHelixPropertyStore.class);
-      when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, 
AccessOption.PERSISTENT))
-          .thenReturn(znrecord);
+      when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, 
AccessOption.PERSISTENT)).thenReturn(
+          znrecord);
     }
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
       
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
       
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_3"))
-          .thenReturn(new SegmentZKMetadata(znrecord));
+      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_3")).thenReturn(
+          new SegmentZKMetadata(znrecord));
     }
     {
       _config = mock(ControllerConf.class);
@@ -355,25 +340,25 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.NUMBER_OF_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.NUMBER_OF_REPLICAS), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.TABLE_COMPRESSED_SIZE), 1111);
   }
 
   @Test
   public void missingEVTest()
       throws Exception {
-    final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-    IdealState idealState = new IdealState(tableName);
+    String realtimeTableName = "myTable_REALTIME";
+
+    IdealState idealState = new IdealState(realtimeTableName);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
@@ -388,11 +373,9 @@ public class SegmentStatusCheckerTest {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -416,30 +399,28 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.NUMBER_OF_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
+        0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
   public void missingIdealTest()
       throws Exception {
-    final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<>();
-    allTableNames.add(tableName);
+    String realtimeTableName = "myTable_REALTIME";
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(null);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null);
+      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -463,24 +444,24 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
+
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS));
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
-            ControllerGauge.NUMBER_OF_REPLICAS));
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
-            ControllerGauge.PERCENT_OF_REPLICAS));
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
-            ControllerGauge.TABLE_COMPRESSED_SIZE));
+    assertFalse(
+        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS));
+    assertFalse(
+        MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS));
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.TABLE_COMPRESSED_SIZE));
   }
 
   @Test
   public void missingEVPartitionPushTest()
       throws Exception {
     String offlineTableName = "myTable_OFFLINE";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(offlineTableName);
+
     IdealState idealState = new IdealState(offlineTableName);
     idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
     idealState.setPartitionState("myTable_1", "pinot1", "ONLINE");
@@ -527,15 +508,13 @@ public class SegmentStatusCheckerTest {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
       
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
       
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_0"))
-          .thenReturn(new SegmentZKMetadata(znrecord));
-      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_2"))
-          .thenReturn(new SegmentZKMetadata(znrecord2));
+      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_0")).thenReturn(
+          new SegmentZKMetadata(znrecord));
+      when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, 
"myTable_2")).thenReturn(
+          new SegmentZKMetadata(znrecord2));
     }
     {
       _config = mock(ControllerConf.class);
@@ -559,27 +538,27 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.NUMBER_OF_REPLICAS), 2);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.NUMBER_OF_REPLICAS), 2);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_OF_REPLICAS), 100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.TABLE_COMPRESSED_SIZE), 0);
   }
 
   @Test
   public void noReplicas()
       throws Exception {
-    final String tableName = "myTable_REALTIME";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-    IdealState idealState = new IdealState(tableName);
+    String realtimeTableName = "myTable_REALTIME";
+
+    IdealState idealState = new IdealState(realtimeTableName);
     idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE");
     idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE");
     idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE");
@@ -590,11 +569,9 @@ public class SegmentStatusCheckerTest {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -618,26 +595,26 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.NUMBER_OF_REPLICAS), 1);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
+        1);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
+        100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
 
   @Test
-  public void disabledTableTest()
-      throws Exception {
+  public void disabledTableTest() {
+    String offlineTableName = "myTable_OFFLINE";
 
-    final String tableName = "myTable_OFFLINE";
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(offlineTableName);
     // disable table in idealstate
     idealState.enable(false);
     idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE");
@@ -648,11 +625,9 @@ public class SegmentStatusCheckerTest {
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
+      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -669,23 +644,21 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker =
         new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
             _executorService);
+
     // verify state before test
-    Assert.assertEquals(
-        MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 0);
+    assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 0);
+
     // update metrics
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    Assert.assertEquals(
-        MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+    assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
   }
 
   @Test
-  public void disabledEmptyTableTest()
-      throws Exception {
+  public void disabledEmptyTableTest() {
+    String offlineTableName = "myTable_OFFLINE";
 
-    final String tableName = "myTable_OFFLINE";
-    List<String> allTableNames = Lists.newArrayList(tableName);
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(offlineTableName);
     // disable table in idealstate
     idealState.enable(false);
     idealState.setReplicas("1");
@@ -693,11 +666,9 @@ public class SegmentStatusCheckerTest {
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
+      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -714,14 +685,14 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker =
         new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
             _executorService);
+
     // verify state before test
-    Assert.assertFalse(
-        MetricValueUtils.globalGaugeExists(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT));
+    assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT));
+
     // update metrics
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    Assert.assertEquals(
-        MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
+    assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT), 1);
   }
 
   @Test
@@ -734,22 +705,20 @@ public class SegmentStatusCheckerTest {
 
   @Test
   public void lessThanOnePercentSegmentsUnavailableTest()
-          throws Exception {
-    String tableName = "myTable_OFFLINE";
-    int numSegments = 200;
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
+      throws Exception {
+    String offlineTableName = "myTable_OFFLINE";
     TableConfig tableConfig =
-            new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(1).build();
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build();
 
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(offlineTableName);
+    int numSegments = 200;
     for (int i = 0; i < numSegments; i++) {
       idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE");
     }
     idealState.setReplicas("1");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
-    ExternalView externalView = new ExternalView(tableName);
+    ExternalView externalView = new ExternalView(offlineTableName);
     externalView.setState("myTable_0", "pinot1", "OFFLINE");
     for (int i = 1; i < numSegments; i++) {
       externalView.setState("myTable_" + i, "pinot1", "ONLINE");
@@ -757,19 +726,17 @@ public class SegmentStatusCheckerTest {
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName));
+      
when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig);
+      
when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView);
     }
     {
       _helixPropertyStore = mock(ZkHelixPropertyStore.class);
       
when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore);
-      SegmentLineage segmentLineage = new SegmentLineage(tableName);
-      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), 
eq(AccessOption.PERSISTENT)))
-              .thenReturn(segmentLineage.toZNRecord());
+      SegmentLineage segmentLineage = new SegmentLineage(offlineTableName);
+      when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), 
any(),
+          
eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord());
     }
     {
       _config = mock(ControllerConf.class);
@@ -788,37 +755,35 @@ public class SegmentStatusCheckerTest {
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _segmentStatusChecker =
-            new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
-                    _executorService);
+        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics,
+            _executorService);
     _segmentStatusChecker.setTableSizeReader(_tableSizeReader);
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
-            ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99);
+
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
externalView.getId(),
+        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99);
   }
 
   public void noSegmentsInternal(final int nReplicas)
       throws Exception {
-    final String tableName = "myTable_REALTIME";
+    String realtimeTableName = "myTable_REALTIME";
+
     String nReplicasStr = Integer.toString(nReplicas);
     int nReplicasExpectedValue = nReplicas;
     if (nReplicas < 0) {
       nReplicasStr = "abc";
       nReplicasExpectedValue = 1;
     }
-    List<String> allTableNames = new ArrayList<String>();
-    allTableNames.add(tableName);
-    IdealState idealState = new IdealState(tableName);
+    IdealState idealState = new IdealState(realtimeTableName);
     idealState.setReplicas(nReplicasStr);
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
 
     {
       _helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(_helixResourceManager.getDatabaseNames())
-          .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-      
when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
-      
when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
-      
when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null);
+      
when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
+      
when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
+      
when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null);
     }
     {
       _config = mock(ControllerConf.class);
@@ -843,15 +808,17 @@ public class SegmentStatusCheckerTest {
     _segmentStatusChecker.start();
     _segmentStatusChecker.run();
 
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
tableName,
+    assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.SEGMENTS_IN_ERROR_STATE));
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-        ControllerGauge.NUMBER_OF_REPLICAS), nReplicasExpectedValue);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
-        ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    
Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
tableName,
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS),
+        nReplicasExpectedValue);
+    assertEquals(
+        MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS),
+        100);
+    assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, 
realtimeTableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 15c3cf6d81..f4e0eb46b1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,7 +34,6 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -89,9 +87,7 @@ public class ControllerPeriodicTaskTest {
   public void beforeTest() {
     List<String> tables = new ArrayList<>(_numTables);
     IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " 
_OFFLINE"));
-    when(_resourceManager.getDatabaseNames())
-        .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-    when(_resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(tables);
+    when(_resourceManager.getAllTables()).thenReturn(tables);
     
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
   }
 
@@ -109,7 +105,6 @@ public class ControllerPeriodicTaskTest {
         _task.getInitialDelayInSeconds() >= 
ControllerConf.ControllerPeriodicTasksConf.MIN_INITIAL_DELAY_IN_SECONDS);
     assertTrue(
         _task.getInitialDelayInSeconds() < 
ControllerConf.ControllerPeriodicTasksConf.MAX_INITIAL_DELAY_IN_SECONDS);
-
     assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS);
   }
 
@@ -124,7 +119,7 @@ public class ControllerPeriodicTaskTest {
     assertFalse(_stopTaskCalled.get());
     assertTrue(_task.isStarted());
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
 
     // Run periodic task with leadership
     resetState();
@@ -133,8 +128,7 @@ public class ControllerPeriodicTaskTest {
     assertTrue(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), _numTables);
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED),
-        _numTables);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables);
     assertFalse(_stopTaskCalled.get());
     assertTrue(_task.isStarted());
 
@@ -145,7 +139,7 @@ public class ControllerPeriodicTaskTest {
     assertFalse(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), 0);
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
     assertTrue(_stopTaskCalled.get());
     assertFalse(_task.isStarted());
 
@@ -156,7 +150,7 @@ public class ControllerPeriodicTaskTest {
     assertFalse(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), 0);
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
     assertFalse(_stopTaskCalled.get());
     assertFalse(_task.isStarted());
 
@@ -169,7 +163,7 @@ public class ControllerPeriodicTaskTest {
     assertFalse(_stopTaskCalled.get());
     assertTrue(_task.isStarted());
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0);
 
     // Run periodic task with leadership
     resetState();
@@ -178,7 +172,7 @@ public class ControllerPeriodicTaskTest {
     assertTrue(_processTablesCalled.get());
     assertEquals(_tablesProcessed.get(), _numTables);
     assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, 
TASK_NAME,
-            ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables);
+        ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables);
     assertFalse(_stopTaskCalled.get());
     assertTrue(_task.isStarted());
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index ce5e31e5ef..b3e656de9e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.retention;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -43,15 +42,13 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.mockito.ArgumentMatchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 public class RetentionManagerTest {
@@ -60,8 +57,7 @@ public class RetentionManagerTest {
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME);
 
-  private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, 
long dayAfterTomorrowTimeStamp)
-      throws Exception {
+  private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, 
long dayAfterTomorrowTimeStamp) {
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
     // Create metadata for 10 segments really old, that will be removed by the 
retention manager.
     final int numOlderSegments = 10;
@@ -105,8 +101,7 @@ public class RetentionManagerTest {
   }
 
   @Test
-  public void testRetentionWithMinutes()
-      throws Exception {
+  public void testRetentionWithMinutes() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60;
     final long pastMinutesSinceEpoch = 22383360L;
@@ -114,8 +109,7 @@ public class RetentionManagerTest {
   }
 
   @Test
-  public void testRetentionWithSeconds()
-      throws Exception {
+  public void testRetentionWithSeconds() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60;
     final long pastSecondsSinceEpoch = 1343001600L;
@@ -123,8 +117,7 @@ public class RetentionManagerTest {
   }
 
   @Test
-  public void testRetentionWithMillis()
-      throws Exception {
+  public void testRetentionWithMillis() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60 * 1000;
     final long pastMillisSinceEpoch = 1343001600000L;
@@ -132,8 +125,7 @@ public class RetentionManagerTest {
   }
 
   @Test
-  public void testRetentionWithHours()
-      throws Exception {
+  public void testRetentionWithHours() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
     final long pastHoursSinceEpoch = 373056L;
@@ -141,8 +133,7 @@ public class RetentionManagerTest {
   }
 
   @Test
-  public void testRetentionWithDays()
-      throws Exception {
+  public void testRetentionWithDays() {
     final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 
60 / 60 / 24 + 2;
     final long pastDaysSinceEpoch = 15544L;
     testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, 
daysSinceEpochTimeStamp);
@@ -161,10 +152,8 @@ public class RetentionManagerTest {
 
   private void setupPinotHelixResourceManager(TableConfig tableConfig, final 
List<String> removedSegments,
       PinotHelixResourceManager resourceManager, LeadControllerManager 
leadControllerManager) {
-    final String tableNameWithType = tableConfig.getTableName();
-    when(resourceManager.getDatabaseNames())
-        .thenReturn(Collections.singletonList(DEFAULT_DATABASE));
-    
when(resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(Collections.singletonList(tableNameWithType));
+    String tableNameWithType = tableConfig.getTableName();
+    
when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType));
 
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
@@ -172,38 +161,27 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
     // Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we 
only test that the call is made once per
     // run of the retention manager
-    doAnswer(new Answer() {
-      @Override
-      public Void answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return null;
-      }
-    }).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+    doAnswer(invocationOnMock -> 
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
     
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
 
     // If and when PinotHelixResourceManager.deleteSegments() is invoked, make 
sure that the segments deleted
     // are exactly the same as the ones we expect to be deleted.
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        Object[] args = invocationOnMock.getArguments();
-        String tableNameArg = (String) args[0];
-        Assert.assertEquals(tableNameArg, tableNameWithType);
-        List<String> segmentListArg = (List<String>) args[1];
-        Assert.assertEquals(segmentListArg.size(), removedSegments.size());
-        for (String segmentName : removedSegments) {
-          Assert.assertTrue(segmentListArg.contains(segmentName));
-        }
-        return null;
+    doAnswer(invocationOnMock -> {
+      Object[] args = invocationOnMock.getArguments();
+      String tableNameArg = (String) args[0];
+      assertEquals(tableNameArg, tableNameWithType);
+      List<String> segmentListArg = (List<String>) args[1];
+      assertEquals(segmentListArg.size(), removedSegments.size());
+      for (String segmentName : removedSegments) {
+        assertTrue(segmentListArg.contains(segmentName));
       }
-    }).when(resourceManager).deleteSegments(anyString(), 
ArgumentMatchers.anyList());
+      return null;
+    }).when(resourceManager).deleteSegments(anyString(), anyList());
   }
 
   // This test makes sure that we clean up the segments marked OFFLINE in 
realtime for more than 7 days
   @Test
-  public void testRealtimeLLCCleanup()
-      throws Exception {
+  public void testRealtimeLLCCleanup() {
     final int initialNumSegments = 8;
     final long now = System.currentTimeMillis();
 
@@ -237,8 +215,7 @@ public class RetentionManagerTest {
 
   // This test makes sure that we do not clean up last llc completed segments
   @Test
-  public void testRealtimeLastLLCCleanup()
-      throws Exception {
+  public void testRealtimeLastLLCCleanup() {
     final long now = System.currentTimeMillis();
     final int replicaCount = 1;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to