This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 667fba096f Cache IndexLoadingConfig in TableDataManager (#15443)
667fba096f is described below

commit 667fba096f0e0c721df1c1a9943ad57a3bacef3f
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Apr 4 12:20:56 2025 -0600

    Cache IndexLoadingConfig in TableDataManager (#15443)
---
 .../core/data/manager/BaseTableDataManager.java    |  46 ++--
 .../manager/offline/DimensionTableDataManager.java |  28 +--
 .../provider/DefaultTableDataManagerProvider.java  |  10 +-
 .../manager/provider/TableDataManagerProvider.java |  12 +-
 .../manager/realtime/RealtimeTableDataManager.java |  31 +--
 .../BaseTableDataManagerAcquireSegmentTest.java    |  10 +-
 .../BaseTableDataManagerNeedRefreshTest.java       | 275 ++++++++++++---------
 .../data/manager/BaseTableDataManagerTest.java     |   3 +-
 .../offline/DimensionTableDataManagerTest.java     |  12 +-
 .../realtime/RealtimeSegmentDataManagerTest.java   |  16 +-
 .../executor/QueryExecutorExceptionsTest.java      |   2 +-
 .../core/query/executor/QueryExecutorTest.java     |   2 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |   2 +-
 .../queries/SegmentWithNullValueVectorTest.java    |   2 +-
 .../FailureInjectingTableDataManagerProvider.java  |  11 +-
 .../perf/BenchmarkDimensionTableOverhead.java      |   2 +-
 .../local/data/manager/TableDataManager.java       |  24 +-
 .../pinot/server/api/resources/TablesResource.java |   5 +-
 .../starter/helix/HelixInstanceDataManager.java    |   7 +-
 .../apache/pinot/server/api/BaseResourceTest.java  |  25 +-
 20 files changed, 286 insertions(+), 239 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c506dc336c..c7773c8b09 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -119,7 +119,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected HelixManager _helixManager;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected SegmentLocks _segmentLocks;
-  protected TableConfig _tableConfig;
   protected String _tableNameWithType;
   protected String _tableDataDir;
   protected File _indexDir;
@@ -136,9 +135,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected boolean _isStreamSegmentDownloadUntar;
   @Nullable
   protected SegmentOperationsThrottler _segmentOperationsThrottler;
+
   // Semaphore to restrict the maximum number of parallel segment downloads 
from deep store for a table
-  private Semaphore _segmentDownloadSemaphore;
-  private AtomicInteger _numSegmentsAcquiredDownloadSemaphore;
+  protected Semaphore _segmentDownloadSemaphore;
+  protected AtomicInteger _numSegmentsAcquiredDownloadSemaphore;
 
   // Fixed size LRU cache with TableName - SegmentName pair as key, and 
segment related errors as the value.
   @Nullable
@@ -146,11 +146,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   // Cache used for identifying segments which could not be acquired since 
they were recently deleted.
   protected Cache<String, String> _recentlyDeletedSegments;
 
+  // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig 
should not be modified.
+  protected volatile IndexLoadingConfig _indexLoadingConfig;
+
   protected volatile boolean _shutDown;
 
   @Override
   public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
HelixManager helixManager,
-      SegmentLocks segmentLocks, TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
+      SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema, 
SegmentReloadSemaphore segmentReloadSemaphore,
       ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
@@ -161,14 +164,13 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     _helixManager = helixManager;
     _propertyStore = helixManager.getHelixPropertyStore();
     _segmentLocks = segmentLocks;
-    _tableConfig = tableConfig;
     _segmentReloadSemaphore = segmentReloadSemaphore;
     _segmentReloadExecutor = segmentReloadExecutor;
     _segmentPreloadExecutor = segmentPreloadExecutor;
-    _authProvider = 
AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(),
 null);
+    _authProvider = 
AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(),
 null);
 
     _tableNameWithType = tableConfig.getTableName();
-    _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + 
File.separator + _tableNameWithType;
+    _tableDataDir = instanceDataManagerConfig.getInstanceDataDir() + 
File.separator + _tableNameWithType;
     _indexDir = new File(_tableDataDir);
     if (!_indexDir.exists()) {
       Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index 
directory at %s. "
@@ -224,6 +226,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       _numSegmentsAcquiredDownloadSemaphore = null;
     }
     _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
getClass().getSimpleName());
+    createAndCacheIndexLoadingConfig(tableConfig, schema);
 
     doInit();
 
@@ -379,21 +382,26 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   @Override
-  public Pair<TableConfig, Schema> fetchTableConfigAndSchema() {
+  public IndexLoadingConfig fetchIndexLoadingConfig() {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", _tableNameWithType);
-    return Pair.of(tableConfig, schema);
+    return createAndCacheIndexLoadingConfig(tableConfig, schema);
   }
 
-  @Override
-  public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, 
Schema schema) {
+  private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig 
tableConfig, Schema schema) {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
+    _indexLoadingConfig = indexLoadingConfig;
     return indexLoadingConfig;
   }
 
+  @Override
+  public IndexLoadingConfig getIndexLoadingConfig() {
+    return _indexLoadingConfig;
+  }
+
   @Override
   public void addNewOnlineSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig)
       throws Exception {
@@ -1258,13 +1266,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   @Override
-  public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema 
schema) {
+  public List<StaleSegment> getStaleSegments() {
+    IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
     List<StaleSegment> staleSegments = new ArrayList<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
-    final long startTime = System.currentTimeMillis();
+    long startTimeMs = System.currentTimeMillis();
     try {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        StaleSegment response = isSegmentStale(tableConfig, schema, 
segmentDataManager);
+        StaleSegment response = isSegmentStale(indexLoadingConfig, 
segmentDataManager);
         if (response.isStale()) {
           staleSegments.add(response);
         }
@@ -1273,13 +1282,18 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         releaseSegment(segmentDataManager);
       }
-      LOGGER.info("Time Taken to get stale segments: {} ms", 
System.currentTimeMillis() - startTime);
+      LOGGER.info("Time Taken to get stale segments: {} ms", 
System.currentTimeMillis() - startTimeMs);
     }
 
     return staleSegments;
   }
 
-  protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema 
schema, SegmentDataManager segmentDataManager) {
+  @VisibleForTesting
+  StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, 
SegmentDataManager segmentDataManager) {
+    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+    Schema schema = indexLoadingConfig.getSchema();
+    assert tableConfig != null && schema != null;
+
     String tableNameWithType = tableConfig.getTableName();
     Map<String, FieldIndexConfigs> indexConfigsMap =
         FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 5e36d847e4..442c62567c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,9 +35,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -108,20 +108,20 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   @Override
   protected void doInit() {
     super.doInit();
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
 
+    IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
+    Schema schema = indexLoadingConfig.getSchema();
+    assert schema != null;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
-    if (tableConfig != null) {
-      DimensionTableConfig dimensionTableConfig = 
tableConfig.getDimensionTableConfig();
-      if (dimensionTableConfig != null) {
-        _disablePreload = dimensionTableConfig.isDisablePreload();
-        _errorOnDuplicatePrimaryKey = 
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
-      }
+    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+    assert tableConfig != null;
+    DimensionTableConfig dimensionTableConfig = 
tableConfig.getDimensionTableConfig();
+    if (dimensionTableConfig != null) {
+      _disablePreload = dimensionTableConfig.isDisablePreload();
+      _errorOnDuplicatePrimaryKey = 
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
     }
 
     if (_disablePreload) {
@@ -206,8 +206,8 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
+    Schema schema = _indexLoadingConfig.getSchema();
+    assert schema != null;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
@@ -282,8 +282,8 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
+    Schema schema = _indexLoadingConfig.getSchema();
+    assert schema != null;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 186736002d..b750016f74 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -64,8 +65,9 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
   }
 
   @Override
-  public TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
@@ -90,8 +92,8 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, segmentReloadSemaphore,
-        segmentReloadExecutor, segmentPreloadExecutor, errorCache, 
_segmentOperationsThrottler);
+    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
+        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, _segmentOperationsThrottler);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 59e5a75f8c..63bfed0b0e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 
 
 /**
@@ -45,14 +46,15 @@ public interface TableDataManagerProvider {
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
 
-  TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentRefreshSemaphore,
-      ExecutorService segmentRefreshExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
+      SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentRefreshExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries);
 
   @VisibleForTesting
-  default TableDataManager getTableDataManager(TableConfig tableConfig) {
-    return getTableDataManager(tableConfig, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null,
-        null, () -> true);
+  default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
+    return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
+        null, null, () -> true);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5d212530b2..0db35b57a4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -43,7 +43,6 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
@@ -201,28 +200,30 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Set up dedup/upsert metadata manager
     // NOTE: Dedup/upsert has to be set up when starting the server. Changing 
the table config without restarting the
     //       server won't enable/disable them on the fly.
-    DedupConfig dedupConfig = _tableConfig.getDedupConfig();
+    IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
+    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+    assert tableConfig != null;
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
     boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
     if (dedupEnabled) {
-      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
-
+      Schema schema = indexLoadingConfig.getSchema();
+      assert schema != null;
       List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
       Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
           "Primary key columns must be configured for dedup");
-      _tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, 
_serverMetrics,
+      _tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(tableConfig, schema, this, 
_serverMetrics,
           _instanceDataManagerConfig.getDedupConfig());
     }
 
-    UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     if (upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) {
       Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both 
enabled for table: %s",
           _tableUpsertMetadataManager);
-      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-      Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
+      Schema schema = indexLoadingConfig.getSchema();
+      assert schema != null;
       _tableUpsertMetadataManager =
-          TableUpsertMetadataManagerFactory.create(_tableConfig, 
_instanceDataManagerConfig.getUpsertConfig());
-      _tableUpsertMetadataManager.init(_tableConfig, schema, this);
+          TableUpsertMetadataManagerFactory.create(tableConfig, 
_instanceDataManagerConfig.getUpsertConfig());
+      _tableUpsertMetadataManager.init(tableConfig, schema, this);
     }
 
     _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
@@ -594,8 +595,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s to be downloaded", status,
         segmentName);
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
+    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+    assert tableConfig != null;
     long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
     long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
     while (System.currentTimeMillis() < deadlineMs) {
@@ -868,7 +869,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   @Nullable
   public StreamIngestionConfig getStreamIngestionConfig() {
-    IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
+    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
+    assert tableConfig != null;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
     return ingestionConfig != null ? 
ingestionConfig.getStreamIngestionConfig() : null;
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index ad43c7c299..c861c13fc4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -123,11 +124,12 @@ public class BaseTableDataManagerAcquireSegmentTest {
     
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
     
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-    SegmentOperationsThrottler segmentOperationsThrottler = new 
SegmentOperationsThrottler(
-        new SegmentAllIndexPreprocessThrottler(8, 10, true), new 
SegmentStarTreePreprocessThrottler(4, 8, true),
-        new SegmentDownloadThrottler(10, 20, true));
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+    SegmentOperationsThrottler segmentOperationsThrottler =
+        new SegmentOperationsThrottler(new 
SegmentAllIndexPreprocessThrottler(8, 10, true),
+            new SegmentStarTreePreprocessThrottler(4, 8, true), new 
SegmentDownloadThrottler(10, 20, true));
     TableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig,
+    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig, schema,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, segmentOperationsThrottler);
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
index 5c4d4d347b..451d5ed9cf 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -84,6 +84,7 @@ public class BaseTableDataManagerNeedRefreshTest {
 
   private static final TableConfig TABLE_CONFIG;
   private static final Schema SCHEMA;
+  private static final IndexLoadingConfig INDEX_LOADING_CONFIG;
   private static final ImmutableSegmentDataManager 
IMMUTABLE_SEGMENT_DATA_MANAGER;
   private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
 
@@ -93,8 +94,9 @@ public class BaseTableDataManagerNeedRefreshTest {
     try {
       TABLE_CONFIG = getTableConfigBuilder().build();
       SCHEMA = getSchema();
+      INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA);
       IMMUTABLE_SEGMENT_DATA_MANAGER =
-          createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA, 
"basicSegment", generateRows());
+          createImmutableSegmentDataManager(INDEX_LOADING_CONFIG, 
"basicSegment", generateRows());
       BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -103,7 +105,8 @@ public class BaseTableDataManagerNeedRefreshTest {
 
   protected static TableConfigBuilder getTableConfigBuilder() {
     return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
-        
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true)
+        .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
+        .setNullHandlingEnabled(true)
         .setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN));
   }
 
@@ -119,7 +122,8 @@ public class BaseTableDataManagerNeedRefreshTest {
         .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(DISTANCE_COLUMN_NAME, FieldSpec.DataType.INT)
-        .addSingleValueDimension(CARRIER_COLUMN_NAME, 
FieldSpec.DataType.STRING).build();
+        .addSingleValueDimension(CARRIER_COLUMN_NAME, 
FieldSpec.DataType.STRING)
+        .build();
   }
 
   protected static List<GenericRow> generateRows() {
@@ -159,10 +163,10 @@ public class BaseTableDataManagerNeedRefreshTest {
     return List.of(row0, row2, row1);
   }
 
-  private static File createSegment(TableConfig tableConfig, Schema schema,
-      String segmentName, List<GenericRow> rows)
+  private static File createSegment(IndexLoadingConfig indexLoadingConfig, 
String segmentName, List<GenericRow> rows)
       throws Exception {
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    SegmentGeneratorConfig config =
+        new SegmentGeneratorConfig(indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getSchema());
     config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
     config.setSegmentName(segmentName);
     config.setSegmentVersion(SegmentVersion.v3);
@@ -174,14 +178,12 @@ public class BaseTableDataManagerNeedRefreshTest {
     return new File(TABLE_DATA_DIR, segmentName);
   }
 
-  private static ImmutableSegmentDataManager 
createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema,
+  private static ImmutableSegmentDataManager 
createImmutableSegmentDataManager(IndexLoadingConfig indexLoadingConfig,
       String segmentName, List<GenericRow> rows)
       throws Exception {
     ImmutableSegmentDataManager segmentDataManager = 
mock(ImmutableSegmentDataManager.class);
     when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
-    File indexDir = createSegment(tableConfig, schema, segmentName, rows);
-
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
+    File indexDir = createSegment(indexLoadingConfig, segmentName, rows);
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig,
         BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER);
     when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
@@ -196,13 +198,15 @@ public class BaseTableDataManagerNeedRefreshTest {
   @Test
   void testAddTimeColumn()
       throws Exception {
-    TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true)
-            
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
-
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
+        .setNullHandlingEnabled(true)
+        .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN))
+        .build();
     Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
-        .addSingleValueDimension(FST_TEST_COLUMN, 
FieldSpec.DataType.STRING).build();
+        .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
+        .build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
 
     GenericRow row = new GenericRow();
     row.putValue(TEXT_INDEX_COLUMN, "text_index_column");
@@ -210,23 +214,22 @@ public class BaseTableDataManagerNeedRefreshTest {
     row.putValue(FST_TEST_COLUMN, "fst_test_column");
 
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, schema, "noChanges", 
List.of(row));
+        createImmutableSegmentDataManager(indexLoadingConfig, "noChanges", 
List.of(row));
     BaseTableDataManager tableDataManager = 
BaseTableDataManagerTest.createTableManager();
 
-    StaleSegment response =
-        tableDataManager.isSegmentStale(tableConfig, schema, 
segmentDataManager);
+    StaleSegment response = 
tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager);
     assertFalse(response.isStale());
 
     // Test new time column
-    response = 
tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(), 
segmentDataManager);
+    response = tableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentDataManager);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "time column");
   }
 
   @Test
   void testChangeTimeColumn() {
-    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(
-        
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(), 
SCHEMA,
+    TableConfig tableConfig = 
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build();
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
         IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "time column");
@@ -237,8 +240,8 @@ public class BaseTableDataManagerNeedRefreshTest {
       throws Exception {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "column deleted: textColumn");
   }
@@ -249,9 +252,8 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, true));
-
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "field type changed: textColumn");
   }
@@ -262,9 +264,8 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.INT, true));
-
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "data type changed: textColumn");
   }
@@ -275,9 +276,8 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, false));
-
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "single / multi value changed: 
textColumn");
   }
@@ -288,9 +288,8 @@ public class BaseTableDataManagerNeedRefreshTest {
     Schema schema = getSchema();
     schema.removeField(TEXT_INDEX_COLUMN_MV);
     schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, 
FieldSpec.DataType.STRING, true));
-
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(TABLE_CONFIG, schema),
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "single / multi value changed: 
textColumnMV");
   }
@@ -298,16 +297,14 @@ public class BaseTableDataManagerNeedRefreshTest {
   @Test
   void testSortColumnMismatch() {
     // Check with a column that is not sorted
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(
-            
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(),
-            SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER);
+    TableConfig tableConfig = 
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
+    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "sort column changed: 
MilliSecondsSinceEpoch");
     // Check with a column that is sorted
-    assertFalse(
-        
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(),
-            SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+    
tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN));
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
   }
 
   @DataProvider(name = "testFilterArgs")
@@ -325,9 +322,9 @@ public class BaseTableDataManagerNeedRefreshTest {
             null, null))).build(), "text index changed: textColumn"
     }, {
         "withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of(
-        new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, 
List.of(FieldConfig.IndexType.FST),
-            null, Map.of(FieldConfig.TEXT_FST_TYPE, 
FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(),
-        "fst index changed: DestCityName"
+        new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, 
List.of(FieldConfig.IndexType.FST), null,
+            Map.of(FieldConfig.TEXT_FST_TYPE,
+                FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(), "fst index 
changed: DestCityName"
     }, {
         "withRangeFilter", getTableConfigBuilder().setRangeIndexColumns(
         List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed: 
MilliSecondsSinceEpoch"
@@ -338,22 +335,22 @@ public class BaseTableDataManagerNeedRefreshTest {
   @Test(dataProvider = "testFilterArgs")
   void testFilter(String segmentName, TableConfig tableConfigWithFilter, 
String expectedReason)
       throws Exception {
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfigWithFilter, SCHEMA);
     ImmutableSegmentDataManager segmentWithFilter =
-        createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA, 
segmentName, generateRows());
+        createImmutableSegmentDataManager(indexLoadingConfig, segmentName, 
generateRows());
 
     // When TableConfig has a filter but segment does not have, needRefresh is 
true.
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), expectedReason);
 
     // When TableConfig does not have a filter but segment has, needRefresh is 
true
-    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithFilter);
+    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithFilter);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), expectedReason);
 
     // When TableConfig has a filter AND segment also has a filter, 
needRefresh is false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, 
SCHEMA, segmentWithFilter).isStale());
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
segmentWithFilter).isStale());
   }
 
   @Test
@@ -361,23 +358,23 @@ public class BaseTableDataManagerNeedRefreshTest {
       throws Exception {
     TableConfig partitionedTableConfig = 
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
         Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(partitionedTableConfig, SCHEMA);
     ImmutableSegmentDataManager segmentWithPartition =
-        createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA, 
"partitionWithModulo", generateRows());
+        createImmutableSegmentDataManager(indexLoadingConfig, 
"partitionWithModulo", generateRows());
 
     // when segment has no partition AND tableConfig has partitions then 
needRefresh = true
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "partition function added: 
partitionedColumn");
 
     // when segment has partitions AND tableConfig has no partitions, then 
needRefresh = false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithPartition).isStale());
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithPartition).isStale());
 
     // when # of partitions is different, then needRefresh = true
     TableConfig partitionedTableConfig40 = 
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
         Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
-
-    response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA, 
segmentWithPartition);
+    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfig40, SCHEMA),
+        segmentWithPartition);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "num partitions changed: 
partitionedColumn");
 
@@ -385,8 +382,8 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig partitionedTableConfigMurmur = 
getTableConfigBuilder().setSegmentPartitionConfig(
         new SegmentPartitionConfig(
             Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
-
-    response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA, 
segmentWithPartition);
+    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA),
+        segmentWithPartition);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "partition function name changed: 
partitionedColumn");
   }
@@ -395,36 +392,33 @@ public class BaseTableDataManagerNeedRefreshTest {
   void testNullValueVector()
       throws Exception {
     TableConfig withoutNullHandling = 
getTableConfigBuilder().setNullHandlingEnabled(false).build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(withoutNullHandling, SCHEMA);
     ImmutableSegmentDataManager segmentWithoutNullHandling =
-        createImmutableSegmentDataManager(withoutNullHandling, SCHEMA, 
"withoutNullHandling", generateRows());
+        createImmutableSegmentDataManager(indexLoadingConfig, 
"withoutNullHandling", generateRows());
 
     // If null handling is removed from table config AND segment has NVV, then 
NVV can be removed. needRefresh = true
-    StaleSegment response =
-        BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    StaleSegment response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
     assertTrue(response.isStale());
     assertEquals(response.getReason(), "null value vector index removed from 
column: NullValueColumn");
 
     // if NVV is added to table config AND segment does not have NVV, then it 
cannot be added. needRefresh = false
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithoutNullHandling).isStale());
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentWithoutNullHandling).isStale());
   }
 
   @Test
-  // Test 1 : Adding a StarTree index should trigger segment refresh.
-  public void addStartreeIndex()
-      throws Exception {
+  public void addStartreeIndex() {
+    // Test 1 : Adding a StarTree index should trigger segment refresh.
+
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
-    ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(getTableConfigBuilder().build(), 
SCHEMA, _testName, generateRows());
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(tableConfig, SCHEMA),
+        IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
   }
 
   @Test
   public void testStarTreeIndexWithDifferentColumn()
       throws Exception {
-
     // Test 2: Adding a new StarTree index with split dimension column of same 
size but with different element should
     // trigger segment refresh.
 
@@ -433,20 +427,21 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     // Create a StarTree index on Distance.
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
   public void testStarTreeIndexWithManyColumns()
       throws Exception {
-
     // Test 3: Adding a new StarTree index with split dimension columns of 
different size should trigger segment
     // refresh.
 
@@ -455,19 +450,20 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
   public void testStartIndexWithDifferentOrder()
       throws Exception {
-
     // Test 4: Adding a new StarTree index with the differently ordered split 
dimension columns should trigger
     // segment refresh.
 
@@ -476,33 +472,39 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     // Create a StarTree index.
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Distance", "Carrier"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
   void testStarTreeIndexWithSkipDimCols()
       throws Exception {
-
     // Test 5: Adding a new StarTree index with skipped dimension columns 
should trigger segment refresh.
     // Create a segment with StarTree index on Carrier, Distance.
+
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     // Create a StarTree index.
     StarTreeIndexConfig newStarTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
             
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -510,35 +512,43 @@ public class BaseTableDataManagerNeedRefreshTest {
       throws Exception {
     // Test 6: Adding a new StarTree index with skipped dimension columns in 
different order should not trigger
     // segment refresh.
+
     StarTreeIndexConfig starTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
             
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Distance", "Carrier"),
             
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
   void testStarTreeIndexRemoveSkipDimCols()
       throws Exception {
     // Test 7: Adding a new StarTree index with removed skipped-dimension 
column should trigger segment refresh.
+
     StarTreeIndexConfig starTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
             
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -548,15 +558,17 @@ public class BaseTableDataManagerNeedRefreshTest {
 
     StarTreeIndexConfig starTreeIndex = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
-
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndex)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
-    StarTreeIndexConfig starTreeIndexAddAggFn = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), 
"MAX__Distance"), null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexAddAggFn)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -569,12 +581,15 @@ public class BaseTableDataManagerNeedRefreshTest {
         Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), 
"MAX__Distance"), null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         Arrays.asList("MAX__Distance", 
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -587,32 +602,39 @@ public class BaseTableDataManagerNeedRefreshTest {
 
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null,
             List.of(new StarTreeAggregationConfig("Distance", "MAX")), 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
   void testStarTreeIndexNewMetricAgg()
       throws Exception {
     // Test 11 : Adding a new metric aggregation function through 
functionColumnPairs should trigger segment refresh.
+
     StarTreeAggregationConfig aggregationConfig = new 
StarTreeAggregationConfig("Distance", "MAX");
     StarTreeIndexConfig starTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null, List.of(aggregationConfig), 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     // Create a StarTree index.
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
         Collections.singletonList(aggregationConfig), 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -627,14 +649,17 @@ public class BaseTableDataManagerNeedRefreshTest {
         Collections.singletonList(aggregationConfig), 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeAggregationConfig starTreeAggregationConfig2 = new 
StarTreeAggregationConfig("*", "count");
     StarTreeIndexConfig newStarTreeIndexConfig =
         new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null,
             Arrays.asList(starTreeAggregationConfig2, aggregationConfig), 100);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -644,12 +669,15 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 10);
-    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -659,9 +687,8 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
-    assertTrue(
-        
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().build(), SCHEMA, 
segmentDataManager).isStale());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, 
segmentDataManager).isStale());
   }
 
   @Test
@@ -673,13 +700,15 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig newTableConfig =
         
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, 
newStarTreeIndexConfig)).build();
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -691,11 +720,13 @@ public class BaseTableDataManagerNeedRefreshTest {
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 
   @Test
@@ -706,10 +737,10 @@ public class BaseTableDataManagerNeedRefreshTest {
     StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
         
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, SCHEMA);
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
-
-    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, 
segmentDataManager).isStale());
+        createImmutableSegmentDataManager(indexLoadingConfig, _testName, 
generateRows());
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, 
segmentDataManager).isStale());
   }
 
   @Test
@@ -722,10 +753,12 @@ public class BaseTableDataManagerNeedRefreshTest {
     TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
     ImmutableSegmentDataManager segmentDataManager =
-        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+        createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, 
SCHEMA), _testName, generateRows());
 
     TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
     newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
-    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+    assertTrue(
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(new 
IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager)
+            .isStale());
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index c992c800da..163cd8a0c5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -664,7 +664,8 @@ public class BaseTableDataManagerTest {
   private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
+        SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
+        SEGMENT_OPERATIONS_THROTTLER);
     return tableDataManager;
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 372d5ce9c2..cd5c0c7c9b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -89,7 +89,6 @@ public class DimensionTableDataManagerTest {
       new SegmentDownloadThrottler(1, 2, true));
 
   private File _indexDir;
-  private SegmentMetadata _segmentMetadata;
   private SegmentZKMetadata _segmentZKMetadata;
 
   @BeforeClass
@@ -120,9 +119,9 @@ public class DimensionTableDataManagerTest {
 
     String segmentName = driver.getSegmentName();
     _indexDir = new File(tableDataDir, segmentName);
-    _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_indexDir);
     _segmentZKMetadata = new SegmentZKMetadata(segmentName);
-    _segmentZKMetadata.setCrc(Long.parseLong(_segmentMetadata.getCrc()));
+    _segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
   }
 
   @AfterClass
@@ -174,7 +173,7 @@ public class DimensionTableDataManagerTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     DimensionTableDataManager tableDataManager =
         
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
-    tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig,
+    tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, schema,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
     tableDataManager.start();
     return tableDataManager;
@@ -294,8 +293,9 @@ public class DimensionTableDataManagerTest {
     Schema schemaWithExtraColumn = getSchemaWithExtraColumn();
     when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, 
AccessOption.PERSISTENT)).thenReturn(
         SchemaUtils.toZNRecord(schemaWithExtraColumn));
-    tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(),
-        new IndexLoadingConfig(tableConfig, schemaWithExtraColumn), 
_segmentZKMetadata, _segmentMetadata, false);
+    when(propertyStore.get("/SEGMENTS/dimBaseballTeams_OFFLINE/" + 
_segmentZKMetadata.getSegmentName(), null,
+        AccessOption.PERSISTENT)).thenReturn(_segmentZKMetadata.toZNRecord());
+    tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), false);
 
     // Confirm the new column is available for lookup
     teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 8bf32c576b..1aa0367421 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -35,12 +35,10 @@ import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.config.TableConfigUtils;
 import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -74,7 +72,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -750,18 +747,13 @@ public class RealtimeSegmentDataManagerTest {
   @Test
   public void 
testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
       throws Exception {
-    TableConfig tableConfig = createTableConfig();
-    tableConfig.setUpsertConfig(null);
-    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
-    when(propertyStore.get(anyString(), any(), 
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
-    HelixManager helixManager = mock(HelixManager.class);
-    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
-
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
-    tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
+    TableConfig tableConfig = createTableConfig();
+    Schema schema = Fixtures.createSchema();
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     tableDataManager.shutDown();
     Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 9afed0b634..7c9bd2e4f4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -136,7 +136,7 @@ public class QueryExecutorExceptionsTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     //we don't add index segments to the data manager to simulate 
numSegmentsAcquired < numSegmentsQueried
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 095a1c0a89..022d134d36 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -160,7 +160,7 @@ public class QueryExecutorTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index aa2d9e2cc1..db1af9416c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -279,7 +279,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG, SCHEMA);
     tableDataManager.start();
     for (IndexSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment((ImmutableSegment) indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 1c509ca5e8..ffbd50a3f5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -141,7 +141,7 @@ public class SegmentWithNullValueVectorTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, _schema);
     tableDataManager.start();
     tableDataManager.addSegment(_segment);
     _instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index 7fbc220806..67cca949e9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -62,8 +63,9 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
   }
 
   @Override
-  public TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
@@ -89,9 +91,8 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, segmentReloadSemaphore,
-        segmentReloadExecutor, segmentPreloadExecutor,
-        errorCache, null);
+    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
+        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, null);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 3ed62ec35a..d2aa77321d 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -184,7 +184,7 @@ public class BenchmarkDimensionTableOverhead extends 
BaseQueriesTest {
 
     String tableName = TABLE_NAME + "_" + _iteration;
     _tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableName);
-    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig,
+    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, SCHEMA,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
     _tableDataManager.start();
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index ed1f96f4da..e8351e5cbe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -53,8 +53,8 @@ public interface TableDataManager {
    * Initializes the table data manager. Should be called only once and before 
calling any other method.
    */
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
-      TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, 
ExecutorService segmentReloadExecutor,
-      @Nullable ExecutorService segmentPreloadExecutor,
+      TableConfig tableConfig, Schema schema, SegmentReloadSemaphore 
segmentReloadSemaphore,
+      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
 
@@ -294,23 +294,16 @@ public interface TableDataManager {
    */
   SegmentZKMetadata fetchZKMetadata(String segmentName);
 
-  /**
-   * Fetches the table config and schema for the table from ZK.
-   */
-  Pair<TableConfig, Schema> fetchTableConfigAndSchema();
-
   /**
    * Fetches the table config and schema for the table from ZK, then construct 
the index loading config with them.
    */
-  default IndexLoadingConfig fetchIndexLoadingConfig() {
-    Pair<TableConfig, Schema> tableConfigSchemaPair = 
fetchTableConfigAndSchema();
-    return getIndexLoadingConfig(tableConfigSchemaPair.getLeft(), 
tableConfigSchemaPair.getRight());
-  }
+  IndexLoadingConfig fetchIndexLoadingConfig();
 
   /**
-   * Constructs the index loading config for the table with the given table 
config and schema.
+   * Returns the cached latest {@link IndexLoadingConfig} for the table. The 
cache is refreshed when invoking
+   * {@link #fetchIndexLoadingConfig()}.
    */
-  IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema 
schema);
+  IndexLoadingConfig getIndexLoadingConfig();
 
   /**
    * Interface to handle segment state transitions from CONSUMING to DROPPED
@@ -330,9 +323,8 @@ public interface TableDataManager {
 
   /**
    * Return list of segment names that are stale along with reason.
-   * @param tableConfig Table Config of the table
-   * @param schema Schema of the table
+   *
    * @return List of {@link StaleSegment} with segment names and reason why it 
is stale
    */
-  List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema);
+  List<StaleSegment> getStaleSegments();
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index f7038ed11f..3df7a4290e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -99,11 +99,9 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.api.AdminApiApplication;
 import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -1188,8 +1186,7 @@ public class TablesResource {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     TableDataManager tableDataManager = 
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
     try {
-      Pair<TableConfig, Schema> tableConfigSchemaPair = 
tableDataManager.fetchTableConfigAndSchema();
-      return 
tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), 
tableConfigSchemaPair.getRight());
+      return tableDataManager.getStaleSegments();
     } catch (Exception e) {
       throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index b79a759e76..b3ed24fa81 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -63,6 +63,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -294,9 +295,11 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
     }
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", tableNameWithType);
     TableDataManager tableDataManager =
-        _tableDataManagerProvider.getTableDataManager(tableConfig, 
_segmentReloadSemaphore, _segmentReloadExecutor,
-            _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);
+        _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
+            _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 1cc0b05b83..9967cade27 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -51,15 +51,16 @@ import 
org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
@@ -68,6 +69,8 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 public abstract class BaseResourceTest {
@@ -98,9 +101,9 @@ public abstract class BaseResourceTest {
     ServerMetrics.register(mock(ServerMetrics.class));
 
     FileUtils.deleteQuietly(TEMP_DIR);
-    Assert.assertTrue(TEMP_DIR.mkdirs());
+    assertTrue(TEMP_DIR.mkdirs());
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
-    Assert.assertNotNull(resourceUrl);
+    assertNotNull(resourceUrl);
     _avroFile = new File(resourceUrl.getFile());
 
     // Mock the instance data manager
@@ -197,13 +200,15 @@ public abstract class BaseResourceTest {
   protected void addTable(String tableNameWithType) {
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    TableConfig tableConfig = mock(TableConfig.class);
-    when(tableConfig.getTableName()).thenReturn(tableNameWithType);
-    
when(tableConfig.getValidationConfig()).thenReturn(mock(SegmentsValidationAndRetentionConfig.class));
-    // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table 
because RealtimeTableDataManager requires
-    //       table config.
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    assertNotNull(tableType);
+    TableConfig tableConfig = new 
TableConfigBuilder(tableType).setTableName(tableNameWithType).build();
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TableNameBuilder.extractRawTableName(tableNameWithType)).build();
+    // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table 
because RealtimeTableDataManager performs
+    //       more checks
     TableDataManager tableDataManager = new OfflineTableDataManager();
-    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig,
+    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig, schema,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, null);
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to