This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4876fb754d Add a Server API to list segments that need to be refreshed 
for a table (#14451)
4876fb754d is described below

commit 4876fb754d5f22c1964323c6e114192ef13cea35
Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com>
AuthorDate: Wed Dec 4 11:12:59 2024 +0530

    Add a Server API to list segments that need to be refreshed for a table 
(#14451)
    
    * checkpoint
    
    * Create API to get segments needing refresh
    
    * Add controller API
    
    * Checkstyle
    
    * Checkpoint tests
    
    * Consolidate setup to static variables
    
    * Checkstyle
    
    * Add more index tests
    
    * Partition tests
    
    * Add NullVectorValue Test
    
    * Add NeedRefreshResponse with reason
    
    * Add NeedRefreshResponse with reason
    
    * Checkstyle & spotless
    
    * Fix Jackson serde
    
    * Fix FST Index test
    
    * Checkpoint H3 experiments
    
    * Checkpoint tests
    
    * Fix all tests
    
    * Doc strings
    
    * Add column name to debug logs
    
    * Release segments
    
    * Add table name to message.
    
    * Use instance name instead of url.
    
    * Use Stale instead of needRefresh
    
    * Add TableStaleSegmentResponse
    
    * Checkstyle fixes
    
    * Log time taken to get stale segments in server
    
    * Add tests for startree index
---
 .../api/resources/PinotSegmentRestletResource.java |  25 +
 .../api/resources/TableStaleSegmentResponse.java   |  67 ++
 .../util/ServerSegmentMetadataReader.java          |  35 +
 .../pinot/controller/util/TableMetadataReader.java |  14 +
 .../core/data/manager/BaseTableDataManager.java    | 288 ++++++++
 .../BaseTableDataManagerNeedRefreshTest.java       | 730 +++++++++++++++++++++
 .../data/manager/BaseTableDataManagerTest.java     |   2 +-
 .../tests/StaleSegmentCheckIntegrationTest.java    | 201 ++++++
 .../segment/local/data/manager/StaleSegment.java   |  61 ++
 .../local/data/manager/TableDataManager.java       |   8 +
 .../pinot/server/api/resources/TablesResource.java |  25 +
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 12 files changed, 1459 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index b114aa8844..7499098780 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -893,6 +893,31 @@ public class PinotSegmentRestletResource {
     }
   }
 
+  @GET
+  @Path("segments/{tableNameWithType}/isStale")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.GET_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Gets a list of segments that are stale from servers 
hosting the table",
+      notes = "Gets a list of segments that are stale from servers hosting the 
table")
+  public Map<String, TableStaleSegmentResponse> getStaleSegments(
+      @ApiParam(value = "Table name with type", required = true, example = 
"myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType, @Context 
HttpHeaders headers) {
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+    LOGGER.info("Received a request to check for segments requiring a refresh 
from all servers hosting segments for "
+        + "table {}", tableNameWithType);
+    try {
+      TableMetadataReader tableMetadataReader =
+          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      return tableMetadataReader.getStaleSegments(tableNameWithType,
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Status.BAD_REQUEST);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER, "Error parsing Pinot 
server response: " + ioe.getMessage(),
+          Status.INTERNAL_SERVER_ERROR, ioe);
+    }
+  }
+
   @GET
   @Path("segments/{tableName}/zkmetadata")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_METADATA)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
new file mode 100644
index 0000000000..eead74dd8f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableStaleSegmentResponse.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
+
+
+public class TableStaleSegmentResponse {
+  private final List<StaleSegment> _staleSegmentList;
+  private final boolean _isValidResponse;
+  private final String _errorMessage;
+
+  @JsonCreator
+  public TableStaleSegmentResponse(@JsonProperty("staleSegmentList") 
List<StaleSegment> staleSegmentList,
+      @JsonProperty("validResponse") boolean isValidResponse,
+      @JsonProperty("errorMessage") String errorMessage) {
+    _staleSegmentList = staleSegmentList;
+    _isValidResponse = isValidResponse;
+    _errorMessage = errorMessage;
+  }
+
+  public TableStaleSegmentResponse(List<StaleSegment> staleSegmentList) {
+    _staleSegmentList = staleSegmentList;
+    _isValidResponse = true;
+    _errorMessage = null;
+  }
+
+  public TableStaleSegmentResponse(String errorMessage) {
+    _staleSegmentList = null;
+    _isValidResponse = false;
+    _errorMessage = errorMessage;
+  }
+
+  @JsonProperty
+  public List<StaleSegment> getStaleSegmentList() {
+    return _staleSegmentList;
+  }
+
+  @JsonProperty
+  public boolean isValidResponse() {
+    return _isValidResponse;
+  }
+
+  @JsonProperty
+  public String getErrorMessage() {
+    return _errorMessage;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 781140a978..8dde7f08fe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -47,6 +47,8 @@ import 
org.apache.pinot.common.restlet.resources.TableSegments;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
@@ -397,6 +399,34 @@ public class ServerSegmentMetadataReader {
     return response;
   }
 
+  public Map<String, TableStaleSegmentResponse> getStaleSegmentsFromServer(
+      String tableNameWithType, Set<String> serverInstances, BiMap<String, 
String> endpoints, int timeoutMs) {
+    LOGGER.debug("Getting list of segments for refresh from servers for table 
{}.", tableNameWithType);
+    List<String> serverURLs = new ArrayList<>();
+    for (String serverInstance : serverInstances) {
+      serverURLs.add(generateStaleSegmentsServerURL(tableNameWithType, 
endpoints.get(serverInstance)));
+    }
+    BiMap<String, String> endpointsToServers = endpoints.inverse();
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
endpointsToServers);
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverURLs, 
tableNameWithType, false, timeoutMs);
+    Map<String, TableStaleSegmentResponse> serverResponses = new HashMap<>();
+
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        List<StaleSegment> staleSegments = 
JsonUtils.stringToObject(streamResponse.getValue(),
+            new TypeReference<List<StaleSegment>>() { });
+        serverResponses.put(streamResponse.getKey(), new 
TableStaleSegmentResponse(staleSegments));
+      } catch (Exception e) {
+        serverResponses.put(streamResponse.getKey(), new 
TableStaleSegmentResponse(e.getMessage()));
+        LOGGER.error("Unable to parse server {} response for needRefresh for 
table {} due to an error: ",
+            streamResponse.getKey(), tableNameWithType, e);
+      }
+    }
+    return serverResponses;
+  }
+
   private String generateAggregateSegmentMetadataServerURL(String 
tableNameWithType, List<String> columns,
       String endpoint) {
     tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
@@ -470,4 +500,9 @@ public class ServerSegmentMetadataReader {
     paramsStr = String.join("&", params);
     return paramsStr;
   }
+
+  private String generateStaleSegmentsServerURL(String tableNameWithType, 
String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    return String.format("%s/tables/%s/segments/isStale", endpoint, 
tableNameWithType);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index c9e87b396b..48f53577a8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -33,6 +33,7 @@ import 
org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -199,4 +200,17 @@ public class TableMetadataReader {
             segmentNames, timeoutMs, validDocIdsType, 
numSegmentsBatchPerServerRequest);
     return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
   }
+
+  public Map<String, TableStaleSegmentResponse> getStaleSegments(String 
tableNameWithType,
+      int timeoutMs)
+      throws InvalidConfigException, IOException {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    List<String> serverInstances = 
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, 
tableType);
+    Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+    BiMap<String, String> endpoints = 
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
+    ServerSegmentMetadataReader serverSegmentMetadataReader =
+        new ServerSegmentMetadataReader(_executor, _connectionManager);
+    return 
serverSegmentMetadataReader.getStaleSegmentsFromServer(tableNameWithType, 
serverInstanceSet, endpoints,
+        timeoutMs);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 10ff609b44..e3e17a6f4d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -30,6 +30,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -40,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -59,25 +62,40 @@ import 
org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
+import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
+import 
org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.SegmentContext;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -1046,6 +1064,276 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     return needReload;
   }
 
+  @Override
+  public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema 
schema) {
+    List<StaleSegment> staleSegments = new ArrayList<>();
+    List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    final long startTime = System.currentTimeMillis();
+    try {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        StaleSegment response = isSegmentStale(tableConfig, schema, 
segmentDataManager);
+        if (response.isStale()) {
+          staleSegments.add(response);
+        }
+      }
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+        releaseSegment(segmentDataManager);
+      }
+      LOGGER.info("Time Taken to get stale segments: {} ms", 
System.currentTimeMillis() - startTime);
+    }
+
+    return staleSegments;
+  }
+
+  protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema schema,
+      SegmentDataManager segmentDataManager) {
+    String tableNameWithType = tableConfig.getTableName();
+    Map<String, FieldIndexConfigs> indexConfigsMap =
+        FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
+
+    String segmentName = segmentDataManager.getSegmentName();
+    IndexSegment segment = segmentDataManager.getSegment();
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames();
+
+    // Time column changed
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    if (timeColumn != null) {
+      if (segmentMetadata.getTimeColumn() == null || 
!segmentMetadata.getTimeColumn().equals(timeColumn)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time 
column", tableNameWithType, segmentName);
+        return new StaleSegment(segmentName, true, "time column");
+      }
+    }
+
+    List<String> sortedColumns = 
tableConfig.getIndexingConfig().getSortedColumn();
+    String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? 
sortedColumns.get(0) : null;
+
+    String partitionColumn = null;
+    ColumnPartitionConfig partitionConfig = null;
+    SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    // NOTE: Partition can only be enabled on a single column
+    if (segmentPartitionConfig != null && 
segmentPartitionConfig.getColumnPartitionMap().size() == 1) {
+      Map.Entry<String, ColumnPartitionConfig> entry =
+          
segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+      partitionColumn = entry.getKey();
+      partitionConfig = entry.getValue();
+    }
+
+    Set<String> columnsInSegment = segmentMetadata.getAllColumns();
+
+    // Column is added
+    if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) {
+      LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column 
added", tableNameWithType, segmentName);
+      return new StaleSegment(segmentName, true, "column added");
+    }
+
+    // Get Index configuration for the Table Config
+    Set<String> noDictionaryColumns =
+        
FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), 
indexConfigsMap);
+    Set<String> bloomFilters =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), 
indexConfigsMap);
+    Set<String> jsonIndex = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), 
indexConfigsMap);
+    Set<String> invertedIndex =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), 
indexConfigsMap);
+    Set<String> nullValueVectorIndex =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(),
 indexConfigsMap);
+    Set<String> rangeIndex = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), 
indexConfigsMap);
+    Set<String> h3Indexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), 
indexConfigsMap);
+    Set<String> fstIndexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), 
indexConfigsMap);
+    Set<String> textIndexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), 
indexConfigsMap);
+    List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig =
+        tableConfig.getIndexingConfig().getStarTreeIndexConfigs();
+
+    // Get the index configuration for StarTree index from segment metadata as 
JsonNode.
+    List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees();
+
+    // Generate StarTree index builder config from the segment metadata.
+    List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new 
ArrayList<>();
+    if (starTreeIndexMetadata != null) {
+      for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) {
+        
builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata()));
+      }
+    }
+
+    // Generate StarTree index builder configs from the table config.
+    List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs =
+        
StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig,
+            tableConfig.getIndexingConfig().isEnableDefaultStarTree(), 
segmentMetadata);
+
+    // Check if there is a mismatch between the StarTree index builder configs 
from the table config and the segment
+    // metadata.
+    if 
(!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs,
+        builderConfigFromSegmentMetadata)) {
+      return new StaleSegment(segmentName, true, "startree index");
+    }
+
+    for (String columnName : segmentPhysicalColumns) {
+      ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(columnName);
+      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
+      DataSource source = segment.getDataSource(columnName);
+      Preconditions.checkNotNull(columnMetadata);
+      Preconditions.checkNotNull(source);
+
+      // Column is deleted
+      if (fieldSpecInSchema == null) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: column deleted",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "column deleted: " + 
columnName);
+      }
+
+      // Field type changed
+      if 
(columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 
0) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: field type", tableNameWithType,
+            columnName, segmentName);
+        return new StaleSegment(segmentName, true, "field type changed: " + 
columnName);
+      }
+
+      // Data type changed
+      if 
(!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: data type", tableNameWithType,
+            columnName, segmentName);
+        return new StaleSegment(segmentName, true, "data type changed: " + 
columnName);
+      }
+
+      // SV/MV changed
+      if (columnMetadata.isSingleValue() != 
fieldSpecInSchema.isSingleValueField()) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: single / multi value",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "single / multi value 
changed: " + columnName);
+      }
+
+      // TODO: detect if an index changes from Dictionary to Variable Length 
Dictionary or vice versa.
+      // TODO: RV TEST
+      boolean colHasDictionary = columnMetadata.hasDictionary();
+      // Encoding changed
+      if (colHasDictionary == noDictionaryColumns.contains(columnName)) {
+        // Check if dictionary update is needed
+        // 1. If the segment metadata has dictionary enabled and table has it 
disabled, its incompatible and refresh is
+        // needed.
+        // 2. If segment metadata has dictionary disabled, check if it has to 
be overridden. If not overridden,
+        // refresh is needed, since table has it enabled.
+        boolean incompatible = colHasDictionary || 
DictionaryIndexType.ignoreDictionaryOverride(
+            tableConfig.getIndexingConfig().isOptimizeDictionary(),
+            tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
+            
tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
+            
tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), 
fieldSpecInSchema,
+            indexConfigsMap.get(columnName), columnMetadata.getCardinality(), 
columnMetadata.getTotalNumberOfEntries());
+        if (incompatible) {
+          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: 
{}, change: dictionary encoding,",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "dictionary encoding 
changed: " + columnName);
+        } else {
+          LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as 
dictionary overrides applied to col: {}",
+              tableNameWithType, segmentName, columnName);
+        }
+      }
+
+      // Sorted column not sorted
+      if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: sort column", tableNameWithType,
+            columnName, segmentName);
+        return new StaleSegment(segmentName, true, "sort column changed: " + 
columnName);
+      }
+
+      if (Objects.isNull(source.getBloomFilter()) == 
bloomFilters.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: bloom filter changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "bloom filter changed: " + 
columnName);
+      }
+
+      if (Objects.isNull(source.getJsonIndex()) == 
jsonIndex.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: json index changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "json index changed: " + 
columnName);
+      }
+
+      if (Objects.isNull(source.getTextIndex()) == 
textIndexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: text index changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "text index changed: " + 
columnName);
+      }
+
+      if (Objects.isNull(source.getFSTIndex()) == 
fstIndexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: fst index changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "fst index changed: " + 
columnName);
+      }
+
+      if (Objects.isNull(source.getH3Index()) == 
h3Indexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: h3 index changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "hst index changed: " + 
columnName);
+      }
+
+      // If a segment is sorted then it will automatically be given an 
inverted index and that overrides the
+      // TableConfig setting
+      if (columnMetadata.isSorted()) {
+        // If a column is sorted and does not have an inverted index but the 
table config does have an inverted index.
+        // But do not remove the inverted index from a sorted column even if 
the table config has no inverted index.
+        if (Objects.isNull(source.getInvertedIndex()) && 
invertedIndex.contains(columnName)) {
+          LOGGER.debug(
+              "tableNameWithType: {}, columnName: {}, segmentName: {}, change: 
inverted index added to sorted column",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "invert index added to 
sort column: " + columnName);
+        }
+      } else {
+        if ((Objects.isNull(source.getInvertedIndex())) == 
invertedIndex.contains(columnName)) {
+          LOGGER.debug(
+              "tableNameWithType: {}, columnName: {}, segmentName: {}, change: 
inverted index changed on unsorted "
+                  + "column",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "inverted index changed 
on unsorted column: " + columnName);
+        }
+      }
+
+      // If a column has a NVV Reader and the Table Config says that it should 
not, then the NVV Reader can be removed.
+      // BUT if a column does NOT have a NVV Reader it cannot be added after 
the segment is created. So, for this check
+      // only check to see if an existing NVV Reader should be removed, but do 
not check if an NVV Reader needs to be
+      // added.
+      if (!Objects.isNull(source.getNullValueVector()) && 
!nullValueVectorIndex.contains(columnName)) {
+        LOGGER.debug(
+            "tableNameWithType: {}, columnName: {}, segmentName: {}, change: 
null value vector index removed from "
+                + "column and cannot be added back to this segment.", 
tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "null value vector index 
removed from column: " + columnName);
+      }
+
+      if (Objects.isNull(source.getRangeIndex()) == 
rangeIndex.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: {}, 
change: range index changed",
+            tableNameWithType, columnName, segmentName);
+        return new StaleSegment(segmentName, true, "range index changed: " + 
columnName);
+      }
+
+      // Partition changed or segment not properly partitioned
+      if (columnName.equals(partitionColumn)) {
+        PartitionFunction partitionFunction = 
columnMetadata.getPartitionFunction();
+        if (partitionFunction == null) {
+          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: 
{}, change: partition function",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "partition function 
added: " + columnName);
+        }
+        if 
(!partitionFunction.getName().equalsIgnoreCase(partitionConfig.getFunctionName()))
 {
+          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: 
{}, change: partition function name",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "partition function name 
changed: " + columnName);
+        }
+        if (partitionFunction.getNumPartitions() != 
partitionConfig.getNumPartitions()) {
+          LOGGER.debug("tableNameWithType: {}, columnName: {},, segmentName: 
{}, change: num partitions",
+              tableNameWithType, columnName, segmentName);
+          return new StaleSegment(segmentName, true, "num partitions changed: 
" + columnName);
+        }
+        Set<Integer> partitions = columnMetadata.getPartitions();
+        if (partitions == null || partitions.size() != 1) {
+          LOGGER.debug("tableNameWithType: {}, columnName: {}, segmentName: 
{}, change: partitions", tableNameWithType,
+              columnName, segmentName);
+          return new StaleSegment(segmentName, true, "partitions changed: " + 
columnName);
+        }
+      }
+    }
+
+    return new StaleSegment(segmentName, false, null);
+  }
+
   private SegmentDirectory initSegmentDirectory(String segmentName, String 
segmentCrc,
       IndexLoadingConfig indexLoadingConfig)
       throws Exception {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
new file mode 100644
index 0000000000..14cfd8cb11
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
+import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+@Test
+public class BaseTableDataManagerNeedRefreshTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"BaseTableDataManagerNeedRefreshTest");
+  private static final String DEFAULT_TABLE_NAME = "mytable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(DEFAULT_TABLE_NAME);
+  private static final File TABLE_DATA_DIR = new File(TEMP_DIR, 
OFFLINE_TABLE_NAME);
+
+  private static final String DEFAULT_TIME_COLUMN_NAME = "DaysSinceEpoch";
+  private static final String MS_SINCE_EPOCH_COLUMN_NAME = 
"MilliSecondsSinceEpoch";
+  private static final String TEXT_INDEX_COLUMN = "textColumn";
+  private static final String TEXT_INDEX_COLUMN_MV = "textColumnMV";
+  private static final String PARTITIONED_COLUMN_NAME = "partitionedColumn";
+  private static final String DISTANCE_COLUMN_NAME = "Distance";
+  private static final String CARRIER_COLUMN_NAME = "Carrier";
+  private static final int NUM_PARTITIONS = 20; // For modulo function
+  private static final String PARTITION_FUNCTION_NAME = "MoDuLo";
+
+  private static final String JSON_INDEX_COLUMN = "jsonField";
+  private static final String FST_TEST_COLUMN = "DestCityName";
+  private static final String NULL_VALUE_COLUMN = "NullValueColumn";
+
+  private static final TableConfig TABLE_CONFIG;
+  private static final Schema SCHEMA;
+  private static final ImmutableSegmentDataManager 
IMMUTABLE_SEGMENT_DATA_MANAGER;
+  private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER;
+
+  private String _testName = "defaultTestName";
+
+  static {
+    try {
+      TABLE_CONFIG = getTableConfigBuilder().build();
+      SCHEMA = getSchema();
+      IMMUTABLE_SEGMENT_DATA_MANAGER =
+          createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA, 
"basicSegment", generateRows());
+      BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static TableConfigBuilder getTableConfigBuilder() {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME)
+        
.setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true)
+        .setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN));
+  }
+
+  protected static Schema getSchema()
+      throws IOException {
+    return new Schema.SchemaBuilder().addDateTime(DEFAULT_TIME_COLUMN_NAME, 
FieldSpec.DataType.INT, "1:DAYS:EPOCH",
+            "1:DAYS")
+        .addDateTime(MS_SINCE_EPOCH_COLUMN_NAME, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .addSingleValueDimension(PARTITIONED_COLUMN_NAME, 
FieldSpec.DataType.INT)
+        .addSingleValueDimension(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING)
+        .addMultiValueDimension(TEXT_INDEX_COLUMN_MV, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
+        .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(DISTANCE_COLUMN_NAME, FieldSpec.DataType.INT)
+        .addSingleValueDimension(CARRIER_COLUMN_NAME, 
FieldSpec.DataType.STRING).build();
+  }
+
+  protected static List<GenericRow> generateRows() {
+    GenericRow row0 = new GenericRow();
+    row0.putValue(DEFAULT_TIME_COLUMN_NAME, 20000);
+    row0.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20000L * 86400 * 1000);
+    row0.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+    row0.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_0");
+    row0.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+    row0.putValue(FST_TEST_COLUMN, "fst_test_column_0");
+    row0.putValue(PARTITIONED_COLUMN_NAME, 0);
+    row0.putValue(DISTANCE_COLUMN_NAME, 1000);
+    row0.putValue(CARRIER_COLUMN_NAME, "c0");
+
+    GenericRow row1 = new GenericRow();
+    row1.putValue(DEFAULT_TIME_COLUMN_NAME, 20001);
+    row1.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20001L * 86400 * 1000);
+    row1.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+    row1.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_1");
+    row1.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+    row1.putValue(FST_TEST_COLUMN, "fst_test_column_1");
+    row1.putValue(PARTITIONED_COLUMN_NAME, 1);
+    row1.putValue(DISTANCE_COLUMN_NAME, 1000);
+    row1.putValue(CARRIER_COLUMN_NAME, "c1");
+
+    GenericRow row2 = new GenericRow();
+    row2.putValue(DEFAULT_TIME_COLUMN_NAME, 20002);
+    row2.putValue(MS_SINCE_EPOCH_COLUMN_NAME, 20002L * 86400 * 1000);
+    row2.putValue(TEXT_INDEX_COLUMN, "text_index_column_0");
+    row2.putValue(TEXT_INDEX_COLUMN_MV, "text_index_column_2");
+    row2.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+    row2.putValue(FST_TEST_COLUMN, "fst_test_column_2");
+    row2.putValue(PARTITIONED_COLUMN_NAME, 2);
+    row2.putValue(DISTANCE_COLUMN_NAME, 2000);
+    row2.putValue(CARRIER_COLUMN_NAME, "c0");
+
+    return List.of(row0, row2, row1);
+  }
+
+  private static File createSegment(TableConfig tableConfig, Schema schema,
+      String segmentName, List<GenericRow> rows)
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(TABLE_DATA_DIR.getAbsolutePath());
+    config.setSegmentName(segmentName);
+    config.setSegmentVersion(SegmentVersion.v3);
+
+    //Create ONE row
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+    return new File(TABLE_DATA_DIR, segmentName);
+  }
+
+  private static ImmutableSegmentDataManager 
createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema,
+      String segmentName, List<GenericRow> rows)
+      throws Exception {
+    ImmutableSegmentDataManager segmentDataManager = 
mock(ImmutableSegmentDataManager.class);
+    when(segmentDataManager.getSegmentName()).thenReturn(segmentName);
+    File indexDir = createSegment(tableConfig, schema, segmentName, rows);
+
+    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig);
+    when(segmentDataManager.getSegment()).thenReturn(immutableSegment);
+    return segmentDataManager;
+  }
+
+  @BeforeMethod
+  void setTestName(Method method) {
+    _testName = method.getName();
+  }
+
+  @Test
+  void testAddTimeColumn()
+      throws Exception {
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true)
+            
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
+
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON)
+        .addSingleValueDimension(FST_TEST_COLUMN, 
FieldSpec.DataType.STRING).build();
+
+    GenericRow row = new GenericRow();
+    row.putValue(TEXT_INDEX_COLUMN, "text_index_column");
+    row.putValue(JSON_INDEX_COLUMN, "{\"a\":\"b\"}");
+    row.putValue(FST_TEST_COLUMN, "fst_test_column");
+
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, schema, "noChanges", 
List.of(row));
+    BaseTableDataManager tableDataManager = 
BaseTableDataManagerTest.createTableManager();
+
+    StaleSegment response =
+        tableDataManager.isSegmentStale(tableConfig, schema, 
segmentDataManager);
+    assertFalse(response.isStale());
+
+    // Test new time column
+    response = 
tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(), 
segmentDataManager);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "time column");
+  }
+
+  @Test
+  void testChangeTimeColumn() {
+    StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(
+        
getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(), 
SCHEMA,
+        IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "time column");
+  }
+
+  @Test
+  void testRemoveColumn()
+      throws Exception {
+    Schema schema = getSchema();
+    schema.removeField(TEXT_INDEX_COLUMN);
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "column deleted: textColumn");
+  }
+
+  @Test
+  void testFieldType()
+      throws Exception {
+    Schema schema = getSchema();
+    schema.removeField(TEXT_INDEX_COLUMN);
+    schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, true));
+
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "field type changed: textColumn");
+  }
+
+  @Test
+  void testChangeDataType()
+      throws Exception {
+    Schema schema = getSchema();
+    schema.removeField(TEXT_INDEX_COLUMN);
+    schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.INT, true));
+
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "data type changed: textColumn");
+  }
+
+  @Test
+  void testChangeToMV()
+      throws Exception {
+    Schema schema = getSchema();
+    schema.removeField(TEXT_INDEX_COLUMN);
+    schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, false));
+
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "single / multi value changed: 
textColumn");
+  }
+
+  @Test
+  void testChangeToSV()
+      throws Exception {
+    Schema schema = getSchema();
+    schema.removeField(TEXT_INDEX_COLUMN_MV);
+    schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, 
FieldSpec.DataType.STRING, true));
+
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "single / multi value changed: 
textColumnMV");
+  }
+
+  @Test
+  void testSortColumnMismatch() {
+    // Check with a column that is not sorted
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(
+            
getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(),
+            SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "sort column changed: 
MilliSecondsSinceEpoch");
+    // Check with a column that is sorted
+    assertFalse(
+        
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(),
+            SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale());
+  }
+
+  @DataProvider(name = "testFilterArgs")
+  private Object[][] testFilterArgs() {
+    return new Object[][]{
+        {
+            "withBloomFilter", getTableConfigBuilder().setBloomFilterColumns(
+            List.of(TEXT_INDEX_COLUMN)).build(), "bloom filter changed: 
textColumn"
+        }, {
+        "withJsonIndex", getTableConfigBuilder().setJsonIndexColumns(
+        List.of(JSON_INDEX_COLUMN)).build(), "json index changed: jsonField"
+    }, {
+        "withTextIndex", getTableConfigBuilder().setFieldConfigList(List.of(
+        new FieldConfig(TEXT_INDEX_COLUMN, 
FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.TEXT),
+            null, null))).build(), "text index changed: textColumn"
+    }, {
+        "withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of(
+        new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, 
List.of(FieldConfig.IndexType.FST),
+            null, Map.of(FieldConfig.TEXT_FST_TYPE, 
FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(),
+        "fst index changed: DestCityName"
+    }, {
+        "withRangeFilter", getTableConfigBuilder().setRangeIndexColumns(
+        List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed: 
MilliSecondsSinceEpoch"
+    }
+    };
+  }
+
+  @Test(dataProvider = "testFilterArgs")
+  void testFilter(String segmentName, TableConfig tableConfigWithFilter, 
String expectedReason)
+      throws Exception {
+    ImmutableSegmentDataManager segmentWithFilter =
+        createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA, 
segmentName, generateRows());
+
+    // When TableConfig has a filter but segment does not have, needRefresh is 
true.
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), expectedReason);
+
+    // When TableConfig does not have a filter but segment has, needRefresh is 
true
+    response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithFilter);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), expectedReason);
+
+    // When TableConfig has a filter AND segment also has a filter, 
needRefresh is false
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, 
SCHEMA, segmentWithFilter).isStale());
+  }
+
+  @Test
+  void testPartition()
+      throws Exception {
+    TableConfig partitionedTableConfig = 
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
+        Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build();
+    ImmutableSegmentDataManager segmentWithPartition =
+        createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA, 
"partitionWithModulo", generateRows());
+
+    // when segment has no partition AND tableConfig has partitions then 
needRefresh = true
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "partition function added: 
partitionedColumn");
+
+    // when segment has partitions AND tableConfig has no partitions, then 
needRefresh = false
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithPartition).isStale());
+
+    // when # of partitions is different, then needRefresh = true
+    TableConfig partitionedTableConfig40 = 
getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig(
+        Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build();
+
+    response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA, 
segmentWithPartition);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "num partitions changed: 
partitionedColumn");
+
+    // when partition function is different, then needRefresh = true
+    TableConfig partitionedTableConfigMurmur = 
getTableConfigBuilder().setSegmentPartitionConfig(
+        new SegmentPartitionConfig(
+            Map.of(PARTITIONED_COLUMN_NAME, new 
ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build();
+
+    response = 
BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA, 
segmentWithPartition);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "partition function name changed: 
partitionedColumn");
+  }
+
+  @Test
+  void testNullValueVector()
+      throws Exception {
+    TableConfig withoutNullHandling = 
getTableConfigBuilder().setNullHandlingEnabled(false).build();
+    ImmutableSegmentDataManager segmentWithoutNullHandling =
+        createImmutableSegmentDataManager(withoutNullHandling, SCHEMA, 
"withoutNullHandling", generateRows());
+
+    // If null handling is removed from table config AND segment has NVV, then 
NVV can be removed. needRefresh = true
+    StaleSegment response =
+        BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA, 
IMMUTABLE_SEGMENT_DATA_MANAGER);
+    assertTrue(response.isStale());
+    assertEquals(response.getReason(), "null value vector index removed from 
column: NullValueColumn");
+
+    // if NVV is added to table config AND segment does not have NVV, then it 
cannot be added. needRefresh = false
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, 
segmentWithoutNullHandling).isStale());
+  }
+
+  @Test
+  // Test 1 : Adding a StarTree index should trigger segment refresh.
+  public void addStartreeIndex()
+      throws Exception {
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(getTableConfigBuilder().build(), 
SCHEMA, _testName, generateRows());
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  public void testStarTreeIndexWithDifferentColumn()
+      throws Exception {
+
+    // Test 2: Adding a new StarTree index with split dimension column of same 
size but with different element should
+    // trigger segment refresh.
+
+    // Create a segment with StarTree index on Carrier.
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    // Create a StarTree index on Distance.
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  public void testStarTreeIndexWithManyColumns()
+      throws Exception {
+
+    // Test 3: Adding a new StarTree index with split dimension columns of 
different size should trigger segment
+    // refresh.
+
+    // Create a segment with StarTree index on Carrier.
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  public void testStartIndexWithDifferentOrder()
+      throws Exception {
+
+    // Test 4: Adding a new StarTree index with the differently ordered split 
dimension columns should trigger
+    // segment refresh.
+
+    // Create a segment with StarTree index on Carrier, Distance.
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    // Create a StarTree index.
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Distance", "Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexWithSkipDimCols()
+      throws Exception {
+
+    // Test 5: Adding a new StarTree index with skipped dimension columns 
should trigger segment refresh.
+    // Create a segment with StarTree index on Carrier, Distance.
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    // Create a StarTree index.
+    StarTreeIndexConfig newStarTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
+            
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexWithDiffOrderSkipDimCols()
+      throws Exception {
+    // Test 6: Adding a new StarTree index with skipped dimension columns in 
different order should not trigger
+    // segment refresh.
+    StarTreeIndexConfig starTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
+            
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Distance", "Carrier"),
+            
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexRemoveSkipDimCols()
+      throws Exception {
+    // Test 7: Adding a new StarTree index with removed skipped-dimension 
column should trigger segment refresh.
+    StarTreeIndexConfig starTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), 
Arrays.asList("Carrier", "Distance"),
+            
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexAddAggFn()
+      throws Exception {
+    // Test 8: Adding a new StarTree index with an added metrics aggregation 
function should trigger segment refresh.
+
+    StarTreeIndexConfig starTreeIndex = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndex)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig starTreeIndexAddAggFn = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), 
"MAX__Distance"), null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexAddAggFn)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexDiffOrderAggFn()
+      throws Exception {
+    // Test 9: Adding a new StarTree index with the same aggregation functions 
but in different order should not
+    // trigger segment refresh.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), 
"MAX__Distance"), null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        Arrays.asList("MAX__Distance", 
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexRemoveAggFn()
+      throws Exception {
+    // Test 10: removing an aggregation function through aggregation config 
should trigger segment refresh.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        Arrays.asList("MAX__Distance", 
AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100);
+
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null,
+            List.of(new StarTreeAggregationConfig("Distance", "MAX")), 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexNewMetricAgg()
+      throws Exception {
+    // Test 11 : Adding a new metric aggregation function through 
functionColumnPairs should trigger segment refresh.
+    StarTreeAggregationConfig aggregationConfig = new 
StarTreeAggregationConfig("Distance", "MAX");
+    StarTreeIndexConfig starTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null, List.of(aggregationConfig), 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    // Create a StarTree index.
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
+        Collections.singletonList(aggregationConfig), 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexDiffOrderAggFn2()
+      throws Exception {
+    // Test 12: Adding a new StarTree index with different ordered aggregation 
functions through aggregation config
+    // should not trigger segment refresh.
+
+    StarTreeAggregationConfig aggregationConfig = new 
StarTreeAggregationConfig("Distance", "MAX");
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
+        Collections.singletonList(aggregationConfig), 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeAggregationConfig starTreeAggregationConfig2 = new 
StarTreeAggregationConfig("*", "count");
+    StarTreeIndexConfig newStarTreeIndexConfig =
+        new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, 
null,
+            Arrays.asList(starTreeAggregationConfig2, aggregationConfig), 100);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexMaxLeafNode()
+      throws Exception {
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 10);
+    TableConfig newConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexRemove()
+      throws Exception {
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+    assertTrue(
+        
BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().build(), SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexAddMultiple()
+      throws Exception {
+    // Test 15: Add multiple StarTree Indexes should trigger segment refresh.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    StarTreeIndexConfig newStarTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Distance"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig newTableConfig =
+        
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, 
newStarTreeIndexConfig)).build();
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexEnableDefault()
+      throws Exception {
+    // Test 16: Enabling default StarTree index should trigger a segment 
refresh.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexNoChanges()
+      throws Exception {
+    // Test 17: Attempting to trigger segment refresh again should not be 
successful.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+
+  @Test
+  void testStarTreeIndexDisableDefault()
+      throws Exception {
+    // Test 18: Disabling default StarTree index should trigger a segment 
refresh.
+
+    StarTreeIndexConfig starTreeIndexConfig = new 
StarTreeIndexConfig(Collections.singletonList("Carrier"), null,
+        
Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()),
 null, 100);
+    TableConfig tableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+    ImmutableSegmentDataManager segmentDataManager =
+        createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, 
generateRows());
+
+    TableConfig newTableConfig = 
getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build();
+    newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false);
+    assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, 
segmentDataManager).isStale());
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 69a8d88fd6..1d17315aa7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -648,7 +648,7 @@ public class BaseTableDataManagerTest {
     }
   }
 
-  private static OfflineTableDataManager createTableManager() {
+  static OfflineTableDataManager createTableManager() {
     return createTableManager(createDefaultInstanceDataManagerConfig());
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
new file mode 100644
index 0000000000..c2fbd83cb9
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class StaleSegmentCheckIntegrationTest extends 
BaseClusterIntegrationTest {
+  private static final String JSON_FIELD = "jsonField";
+
+  private PinotTaskManager _taskManager;
+  private PinotHelixTaskResourceManager _taskResourceManager;
+  private TableConfig _tableConfig;
+  private Schema _schema;
+  private List<File> _avroFiles;
+  private static final String H3_INDEX_COLUMN = "h3Column";
+  private static final Map<String, String> H3_INDEX_PROPERTIES = 
Collections.singletonMap("resolutions", "5");
+  private static final String TEXT_INDEX_COLUMN = "textColumn";
+  private static final String NULL_INDEX_COLUMN = "nullField";
+
+  private static final String JSON_INDEX_COLUMN = "jsonField";
+  private static final String FST_TEST_COLUMN = "DestCityName";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+    startMinion();
+    // Start Kafka
+    startKafka();
+
+    _taskManager = _controllerStarter.getTaskManager();
+    _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    _avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config
+    _schema = createSchema();
+    _schema.addField(new DimensionFieldSpec(JSON_FIELD, 
FieldSpec.DataType.STRING, true));
+    _schema.addField(new DimensionFieldSpec(NULL_INDEX_COLUMN, 
FieldSpec.DataType.STRING, true));
+    _schema.addField(new DimensionFieldSpec(H3_INDEX_COLUMN, 
FieldSpec.DataType.BYTES, true));
+    _schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, 
FieldSpec.DataType.STRING, true));
+
+    addSchema(_schema);
+
+    _tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setTimeColumnName(getTimeColumnName())
+            
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(true)
+            
.setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build();
+    addTableConfig(_tableConfig);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, 
_tableConfig, _schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(300_000L);
+  }
+
+  private FieldConfig getH3FieldConfig() {
+    return new FieldConfig(H3_INDEX_COLUMN, 
FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.H3, null,
+        H3_INDEX_PROPERTIES);
+  }
+
+  private FieldConfig getTextFieldConfig() {
+    return new FieldConfig(TEXT_INDEX_COLUMN, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null, null);
+  }
+
+  private FieldConfig getFstFieldConfig() {
+    Map<String, String> propertiesMap = new HashMap<>();
+    propertiesMap.put(FieldConfig.TEXT_FST_TYPE, 
FieldConfig.TEXT_NATIVE_FST_LITERAL);
+    return new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null,
+        propertiesMap);
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    List<TransformConfig> transforms = new ArrayList<>();
+    transforms.add(new TransformConfig(JSON_INDEX_COLUMN,
+        
"Groovy({'{\"DestState\":\"'+DestState+'\",\"OriginState\":\"'+OriginState+'\"}'},
 DestState, OriginState)"));
+    transforms.add(new TransformConfig(NULL_INDEX_COLUMN, "Groovy({null})"));
+    // This is the byte encoding of ST_POINT(-122, 37)
+    transforms.add(new TransformConfig(H3_INDEX_COLUMN,
+        
"Groovy({[0x00,0xc0,0x5e,0x80,0x00,0x00,0x00,0x00,0x00,0x40,0x42,0x80,0x00,0x00,0x00,0x00,0x00]
 as byte[]})"));
+    transforms.add(new TransformConfig(TEXT_INDEX_COLUMN, "Groovy({\"Hello 
this is a text column\"})"));
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transforms);
+
+    return ingestionConfig;
+  }
+
+  @Test
+  public void testAddRemoveSortedIndex()
+      throws Exception {
+    // Add a sorted column to the table
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+    updateTableConfig(_tableConfig);
+
+    Map<String, TableStaleSegmentResponse> needRefreshResponses = 
getStaleSegmentsResponse();
+    assertEquals(needRefreshResponses.size(), 1);
+    
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
 12);
+  }
+
+  @Test(dependsOnMethods = "testAddRemoveSortedIndex")
+  public void testAddRemoveRawIndex()
+      throws Exception {
+    // Add a raw index column
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    
indexingConfig.setNoDictionaryColumns(Collections.singletonList("ActualElapsedTime"));
+    updateTableConfig(_tableConfig);
+
+    Map<String, TableStaleSegmentResponse> needRefreshResponses = 
getStaleSegmentsResponse();
+    assertEquals(needRefreshResponses.size(), 1);
+    
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
 12);
+  }
+
+  @Test(dependsOnMethods = "testAddRemoveSortedIndex")
+  public void testH3IndexChange()
+      throws Exception {
+    // Add a H3 index column
+    
_tableConfig.setFieldConfigList(Collections.singletonList(getH3FieldConfig()));
+    updateTableConfig(_tableConfig);
+
+    Map<String, TableStaleSegmentResponse> needRefreshResponses = 
getStaleSegmentsResponse();
+    assertEquals(needRefreshResponses.size(), 1);
+    
assertEquals(needRefreshResponses.values().iterator().next().getStaleSegmentList().size(),
 12);
+  }
+
+  private Map<String, TableStaleSegmentResponse> getStaleSegmentsResponse()
+      throws IOException {
+    return JsonUtils.stringToObject(sendGetRequest(
+            _controllerRequestURLBuilder.forStaleSegments(
+                TableNameBuilder.OFFLINE.tableNameWithType(getTableName()))),
+        new TypeReference<Map<String, TableStaleSegmentResponse>>() { });
+  }
+
+  @AfterClass
+  public void tearDown() {
+    try {
+      stopMinion();
+      stopServer();
+      stopBroker();
+      stopController();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
new file mode 100644
index 0000000000..3e67093f1d
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/StaleSegment.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.data.manager;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * Encapsulates information for a stale segment. It captures segment name, 
staleness and reason if it is stale.
+ */
+public class StaleSegment {
+  private final String _segmentName;
+  private final boolean _isStale;
+  private final String _reason;
+
+  @JsonCreator
+  public StaleSegment(@JsonProperty("segmentName") String segmentName, 
@JsonProperty("reason") String reason) {
+    _segmentName = segmentName;
+    _isStale = true;
+    _reason = reason;
+  }
+
+  public StaleSegment(String segmentName, boolean isStale, String reason) {
+    _segmentName = segmentName;
+    _isStale = isStale;
+    _reason = reason;
+  }
+
+  @JsonProperty
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  @JsonIgnore
+  public boolean isStale() {
+    return _isStale;
+  }
+
+  @JsonProperty
+  public String getReason() {
+    return _reason;
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 092701bdef..cf7e623269 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -323,4 +323,12 @@ public interface TableDataManager {
    */
   default void onConsumingToOnline(String segmentNameStr) {
   }
+
+  /**
+   * Return list of segment names that are stale along with reason.
+   * @param tableConfig Table Config of the table
+   * @param schema Schema of the table
+   * @return List of {@link StaleSegment} with segment names and reason why it 
is stale
+   */
+  List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema);
 }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b393ac050e..0506396e7e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -84,6 +84,7 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.StaleSegment;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.spi.ColumnMetadata;
@@ -96,9 +97,11 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.api.AdminApiApplication;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -1082,4 +1085,26 @@ public class TablesResource {
     return ResourceUtils.convertToJsonString(
         new ServerSegmentsReloadCheckResponse(needReload, 
tableDataManager.getInstanceId()));
   }
+
+  @GET
+  @Path("/tables/{tableName}/segments/isStale")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the list of segments that are stale or deviated 
from table config.",
+      notes = "Get the list of segments that are stale or deviated from table 
config")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
+      message = "Internal Server error", response = ErrorInfo.class)
+  })
+  public List<StaleSegment> getStaleSegments(
+      @ApiParam(value = "Table Name with type", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    TableDataManager tableDataManager = 
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+    try {
+      Pair<TableConfig, Schema> tableConfigSchemaPair = 
tableDataManager.fetchTableConfigAndSchema();
+      return 
tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), 
tableConfigSchemaPair.getRight());
+    } catch (Exception e) {
+      throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 335f251995..da83dc2194 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -244,6 +244,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, 
query);
   }
 
+  public String forStaleSegments(String tableNameWithType) {
+    return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, 
"isStale");
+  }
+
   public String forTableRebalanceStatus(String jobId) {
     return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to