This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch table_data_manager_cache_config
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 078808af4c92d417004684b44a90566d2b645991
Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com>
AuthorDate: Wed Apr 2 18:48:49 2025 -0700

    Cache TableConfig and Schema in TableDataManager
---
 .../core/data/manager/BaseTableDataManager.java    | 44 +++++++++++++++-------
 .../manager/offline/DimensionTableDataManager.java | 22 ++++-------
 .../provider/DefaultTableDataManagerProvider.java  | 10 +++--
 .../manager/provider/TableDataManagerProvider.java | 12 +++---
 .../manager/realtime/RealtimeTableDataManager.java | 15 ++++----
 .../BaseTableDataManagerAcquireSegmentTest.java    | 10 +++--
 .../data/manager/BaseTableDataManagerTest.java     |  3 +-
 .../offline/DimensionTableDataManagerTest.java     | 12 +++---
 .../realtime/RealtimeSegmentDataManagerTest.java   |  3 +-
 .../executor/QueryExecutorExceptionsTest.java      |  2 +-
 .../core/query/executor/QueryExecutorTest.java     |  2 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  2 +-
 .../queries/SegmentWithNullValueVectorTest.java    |  2 +-
 .../FailureInjectingTableDataManagerProvider.java  | 11 +++---
 .../perf/BenchmarkDimensionTableOverhead.java      |  2 +-
 .../local/data/manager/TableDataManager.java       | 26 ++++++-------
 .../pinot/server/api/resources/TablesResource.java |  5 +--
 .../starter/helix/HelixInstanceDataManager.java    |  7 +++-
 .../apache/pinot/server/api/BaseResourceTest.java  |  3 +-
 19 files changed, 105 insertions(+), 88 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c506dc336c..254b2eb58f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -103,6 +103,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,7 +120,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected HelixManager _helixManager;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
   protected SegmentLocks _segmentLocks;
-  protected TableConfig _tableConfig;
   protected String _tableNameWithType;
   protected String _tableDataDir;
   protected File _indexDir;
@@ -136,6 +136,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected boolean _isStreamSegmentDownloadUntar;
   @Nullable
   protected SegmentOperationsThrottler _segmentOperationsThrottler;
+
+  // Caches the latest TableConfig and Schema. The cached TableConfig and 
Schema already have timestamp index applied,
+  // and should not be modified.
+  protected volatile TableConfig _tableConfig;
+  protected volatile Schema _schema;
+
   // Semaphore to restrict the maximum number of parallel segment downloads 
from deep store for a table
   private Semaphore _segmentDownloadSemaphore;
   private AtomicInteger _numSegmentsAcquiredDownloadSemaphore;
@@ -150,7 +156,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
   @Override
   public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
HelixManager helixManager,
-      SegmentLocks segmentLocks, TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
+      SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema, 
SegmentReloadSemaphore segmentReloadSemaphore,
       ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
@@ -161,14 +167,16 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     _helixManager = helixManager;
     _propertyStore = helixManager.getHelixPropertyStore();
     _segmentLocks = segmentLocks;
+    TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     _tableConfig = tableConfig;
+    _schema = schema;
     _segmentReloadSemaphore = segmentReloadSemaphore;
     _segmentReloadExecutor = segmentReloadExecutor;
     _segmentPreloadExecutor = segmentPreloadExecutor;
-    _authProvider = 
AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(),
 null);
+    _authProvider = 
AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(),
 null);
 
     _tableNameWithType = tableConfig.getTableName();
-    _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + 
File.separator + _tableNameWithType;
+    _tableDataDir = instanceDataManagerConfig.getInstanceDataDir() + 
File.separator + _tableNameWithType;
     _indexDir = new File(_tableDataDir);
     if (!_indexDir.exists()) {
       Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index 
directory at %s. "
@@ -379,21 +387,29 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   @Override
-  public Pair<TableConfig, Schema> fetchTableConfigAndSchema() {
+  public IndexLoadingConfig fetchIndexLoadingConfig() {
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", _tableNameWithType);
-    return Pair.of(tableConfig, schema);
-  }
-
-  @Override
-  public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, 
Schema schema) {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
+    // NOTE: Timestamp index is already applied in the constructor of 
IndexLoadingConfig.
+    _tableConfig = tableConfig;
+    _schema = schema;
     return indexLoadingConfig;
   }
 
+  @Override
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return _schema;
+  }
+
   @Override
   public void addNewOnlineSegment(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig)
       throws Exception {
@@ -1258,10 +1274,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   }
 
   @Override
-  public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema 
schema) {
+  public List<StaleSegment> getStaleSegments() {
     List<StaleSegment> staleSegments = new ArrayList<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
-    final long startTime = System.currentTimeMillis();
+    TableConfig tableConfig = _tableConfig;
+    Schema schema = _schema;
+    long startTimeMs = System.currentTimeMillis();
     try {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         StaleSegment response = isSegmentStale(tableConfig, schema, 
segmentDataManager);
@@ -1273,7 +1291,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         releaseSegment(segmentDataManager);
       }
-      LOGGER.info("Time Taken to get stale segments: {} ms", 
System.currentTimeMillis() - startTime);
+      LOGGER.info("Time Taken to get stale segments: {} ms", 
System.currentTimeMillis() - startTimeMs);
     }
 
     return staleSegments;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 5e36d847e4..0c0a7240ab 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,14 +35,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.config.table.DimensionTableConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -108,20 +106,16 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   @Override
   protected void doInit() {
     super.doInit();
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
 
+    Schema schema = _schema;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
-    if (tableConfig != null) {
-      DimensionTableConfig dimensionTableConfig = 
tableConfig.getDimensionTableConfig();
-      if (dimensionTableConfig != null) {
-        _disablePreload = dimensionTableConfig.isDisablePreload();
-        _errorOnDuplicatePrimaryKey = 
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
-      }
+    DimensionTableConfig dimensionTableConfig = 
_tableConfig.getDimensionTableConfig();
+    if (dimensionTableConfig != null) {
+      _disablePreload = dimensionTableConfig.isDisablePreload();
+      _errorOnDuplicatePrimaryKey = 
dimensionTableConfig.isErrorOnDuplicatePrimaryKey();
     }
 
     if (_disablePreload) {
@@ -206,8 +200,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
+    Schema schema = _schema;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
@@ -282,8 +275,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
-    Preconditions.checkState(schema != null, "Failed to find schema for 
dimension table: %s", _tableNameWithType);
+    Schema schema = _schema;
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 186736002d..b750016f74 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -64,8 +65,9 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
   }
 
   @Override
-  public TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
@@ -90,8 +92,8 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, segmentReloadSemaphore,
-        segmentReloadExecutor, segmentPreloadExecutor, errorCache, 
_segmentOperationsThrottler);
+    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
+        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, _segmentOperationsThrottler);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 59e5a75f8c..63bfed0b0e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 
 
 /**
@@ -45,14 +46,15 @@ public interface TableDataManagerProvider {
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
 
-  TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentRefreshSemaphore,
-      ExecutorService segmentRefreshExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
+      SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentRefreshExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries);
 
   @VisibleForTesting
-  default TableDataManager getTableDataManager(TableConfig tableConfig) {
-    return getTableDataManager(tableConfig, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null,
-        null, () -> true);
+  default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
+    return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
+        null, null, () -> true);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 5d212530b2..9c82d43fca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -201,7 +201,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Set up dedup/upsert metadata manager
     // NOTE: Dedup/upsert has to be set up when starting the server. Changing 
the table config without restarting the
     //       server won't enable/disable them on the fly.
-    DedupConfig dedupConfig = _tableConfig.getDedupConfig();
+    TableConfig tableConfig = _tableConfig;
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
     boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled();
     if (dedupEnabled) {
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
@@ -210,19 +211,19 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
       Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
           "Primary key columns must be configured for dedup");
-      _tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, 
_serverMetrics,
+      _tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(tableConfig, schema, this, 
_serverMetrics,
           _instanceDataManagerConfig.getDedupConfig());
     }
 
-    UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
+    UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     if (upsertConfig != null && upsertConfig.getMode() != 
UpsertConfig.Mode.NONE) {
       Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both 
enabled for table: %s",
           _tableUpsertMetadataManager);
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", _tableNameWithType);
       _tableUpsertMetadataManager =
-          TableUpsertMetadataManagerFactory.create(_tableConfig, 
_instanceDataManagerConfig.getUpsertConfig());
-      _tableUpsertMetadataManager.init(_tableConfig, schema, this);
+          TableUpsertMetadataManagerFactory.create(tableConfig, 
_instanceDataManagerConfig.getUpsertConfig());
+      _tableUpsertMetadataManager.init(tableConfig, schema, this);
     }
 
     _enforceConsumptionInOrder = isEnforceConsumptionInOrder();
@@ -594,9 +595,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s to be downloaded", status,
         segmentName);
-    TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
-    long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+    long downloadTimeoutMs = getDownloadTimeoutMs(_tableConfig);
     long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
     while (System.currentTimeMillis() < deadlineMs) {
       // ZK Metadata may change during segment download process; fetch it on 
every retry.
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index ad43c7c299..499c3e14a8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -123,12 +124,13 @@ public class BaseTableDataManagerAcquireSegmentTest {
     
when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE);
     
when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-    SegmentOperationsThrottler segmentOperationsThrottler = new 
SegmentOperationsThrottler(
-        new SegmentAllIndexPreprocessThrottler(8, 10, true), new 
SegmentStarTreePreprocessThrottler(4, 8, true),
-        new SegmentDownloadThrottler(10, 20, true));
+    SegmentOperationsThrottler segmentOperationsThrottler =
+        new SegmentOperationsThrottler(new 
SegmentAllIndexPreprocessThrottler(8, 10, true),
+            new SegmentStarTreePreprocessThrottler(4, 8, true), new 
SegmentDownloadThrottler(10, 20, true));
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, segmentOperationsThrottler);
+        mock(Schema.class), new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
+        segmentOperationsThrottler);
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index c992c800da..163cd8a0c5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -664,7 +664,8 @@ public class BaseTableDataManagerTest {
   private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
+        SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
+        SEGMENT_OPERATIONS_THROTTLER);
     return tableDataManager;
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 372d5ce9c2..cd5c0c7c9b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -89,7 +89,6 @@ public class DimensionTableDataManagerTest {
       new SegmentDownloadThrottler(1, 2, true));
 
   private File _indexDir;
-  private SegmentMetadata _segmentMetadata;
   private SegmentZKMetadata _segmentZKMetadata;
 
   @BeforeClass
@@ -120,9 +119,9 @@ public class DimensionTableDataManagerTest {
 
     String segmentName = driver.getSegmentName();
     _indexDir = new File(tableDataDir, segmentName);
-    _segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_indexDir);
     _segmentZKMetadata = new SegmentZKMetadata(segmentName);
-    _segmentZKMetadata.setCrc(Long.parseLong(_segmentMetadata.getCrc()));
+    _segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
   }
 
   @AfterClass
@@ -174,7 +173,7 @@ public class DimensionTableDataManagerTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     DimensionTableDataManager tableDataManager =
         
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
-    tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig,
+    tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, schema,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
     tableDataManager.start();
     return tableDataManager;
@@ -294,8 +293,9 @@ public class DimensionTableDataManagerTest {
     Schema schemaWithExtraColumn = getSchemaWithExtraColumn();
     when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, 
AccessOption.PERSISTENT)).thenReturn(
         SchemaUtils.toZNRecord(schemaWithExtraColumn));
-    tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(),
-        new IndexLoadingConfig(tableConfig, schemaWithExtraColumn), 
_segmentZKMetadata, _segmentMetadata, false);
+    when(propertyStore.get("/SEGMENTS/dimBaseballTeams_OFFLINE/" + 
_segmentZKMetadata.getSegmentName(), null,
+        AccessOption.PERSISTENT)).thenReturn(_segmentZKMetadata.toZNRecord());
+    tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), false);
 
     // Confirm the new column is available for lookup
     teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 8bf32c576b..887f8885e2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -752,6 +752,7 @@ public class RealtimeSegmentDataManagerTest {
       throws Exception {
     TableConfig tableConfig = createTableConfig();
     tableConfig.setUpsertConfig(null);
+    Schema schema = mock(Schema.class);
     ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
     when(propertyStore.get(anyString(), any(), 
anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
     HelixManager helixManager = mock(HelixManager.class);
@@ -761,7 +762,7 @@ public class RealtimeSegmentDataManagerTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     tableDataManager.shutDown();
     Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 9afed0b634..7c9bd2e4f4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -136,7 +136,7 @@ public class QueryExecutorExceptionsTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     //we don't add index segments to the data manager to simulate 
numSegmentsAcquired < numSegmentsQueried
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 095a1c0a89..022d134d36 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -160,7 +160,7 @@ public class QueryExecutorTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, schema);
     tableDataManager.start();
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index aa2d9e2cc1..db1af9416c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -279,7 +279,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG, SCHEMA);
     tableDataManager.start();
     for (IndexSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment((ImmutableSegment) indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 1c509ca5e8..ffbd50a3f5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -141,7 +141,7 @@ public class SegmentWithNullValueVectorTest {
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
     TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
     tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks(), null);
-    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig, _schema);
     tableDataManager.start();
     tableDataManager.addSegment(_segment);
     _instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index 7fbc220806..67cca949e9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -62,8 +63,9 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
   }
 
   @Override
-  public TableDataManager getTableDataManager(TableConfig tableConfig, 
SegmentReloadSemaphore segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+  public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
     TableDataManager tableDataManager;
@@ -89,9 +91,8 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, segmentReloadSemaphore,
-        segmentReloadExecutor, segmentPreloadExecutor,
-        errorCache, null);
+    tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
+        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, null);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 3ed62ec35a..d2aa77321d 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -184,7 +184,7 @@ public class BenchmarkDimensionTableOverhead extends 
BaseQueriesTest {
 
     String tableName = TABLE_NAME + "_" + _iteration;
     _tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableName);
-    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig,
+    _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, SCHEMA,
         new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
     _tableDataManager.start();
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index ed1f96f4da..111c2ec4e5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -53,8 +53,8 @@ public interface TableDataManager {
    * Initializes the table data manager. Should be called only once and before 
calling any other method.
    */
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
-      TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, 
ExecutorService segmentReloadExecutor,
-      @Nullable ExecutorService segmentPreloadExecutor,
+      TableConfig tableConfig, Schema schema, SegmentReloadSemaphore 
segmentReloadSemaphore,
+      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
 
@@ -295,22 +295,21 @@ public interface TableDataManager {
   SegmentZKMetadata fetchZKMetadata(String segmentName);
 
   /**
-   * Fetches the table config and schema for the table from ZK.
+   * Fetches the table config and schema for the table from ZK, then construct 
the index loading config with them.
    */
-  Pair<TableConfig, Schema> fetchTableConfigAndSchema();
+  IndexLoadingConfig fetchIndexLoadingConfig();
 
   /**
-   * Fetches the table config and schema for the table from ZK, then construct 
the index loading config with them.
+   * Returns the cached latest table config (with timestamp index applied) for 
the table. The cache is refreshed when
+   * invoking {@link #fetchIndexLoadingConfig()}.
    */
-  default IndexLoadingConfig fetchIndexLoadingConfig() {
-    Pair<TableConfig, Schema> tableConfigSchemaPair = 
fetchTableConfigAndSchema();
-    return getIndexLoadingConfig(tableConfigSchemaPair.getLeft(), 
tableConfigSchemaPair.getRight());
-  }
+  TableConfig getTableConfig();
 
   /**
-   * Constructs the index loading config for the table with the given table 
config and schema.
+   * Returns the cached latest schema (with timestamp index applied) for the 
table. The cache is refreshed when invoking
+   * {@link #fetchIndexLoadingConfig()}.
    */
-  IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema 
schema);
+  Schema getSchema();
 
   /**
    * Interface to handle segment state transitions from CONSUMING to DROPPED
@@ -330,9 +329,8 @@ public interface TableDataManager {
 
   /**
    * Return list of segment names that are stale along with reason.
-   * @param tableConfig Table Config of the table
-   * @param schema Schema of the table
+   *
    * @return List of {@link StaleSegment} with segment names and reason why it 
is stale
    */
-  List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema);
+  List<StaleSegment> getStaleSegments();
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index f7038ed11f..3df7a4290e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -99,11 +99,9 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.api.AdminApiApplication;
 import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -1188,8 +1186,7 @@ public class TablesResource {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     TableDataManager tableDataManager = 
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
     try {
-      Pair<TableConfig, Schema> tableConfigSchemaPair = 
tableDataManager.fetchTableConfigAndSchema();
-      return 
tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), 
tableConfigSchemaPair.getRight());
+      return tableDataManager.getStaleSegments();
     } catch (Exception e) {
       throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
     }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index b79a759e76..b3ed24fa81 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -63,6 +63,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -294,9 +295,11 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
       tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, 
tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
     }
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", tableNameWithType);
     TableDataManager tableDataManager =
-        _tableDataManagerProvider.getTableDataManager(tableConfig, 
_segmentReloadSemaphore, _segmentReloadExecutor,
-            _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);
+        _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
+            _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 1cc0b05b83..89cc6ef292 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -53,6 +53,7 @@ import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.NetUtils;
@@ -204,7 +205,7 @@ public abstract class BaseResourceTest {
     //       table config.
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, null);
+        mock(Schema.class), new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null, null);
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to