This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f9d23b0f42 Backfill missing length stats during forward-index reload 
(#18496)
5f9d23b0f42 is described below

commit 5f9d23b0f428f711bb5477d0bd1def1943439610
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 14 13:41:15 2026 -0700

    Backfill missing length stats during forward-index reload (#18496)
    
    When a forward-index update (compression change, dict→raw, or rebuild via
    InvertedIndexAndDictionaryBasedForwardIndexCreator) runs against a pre-1.6.0
    segment, the per-element length stats (`LENGTH_OF_SHORTEST_ELEMENT`,
    `LENGTH_OF_LONGEST_ELEMENT`, `MAX_ROW_LENGTH_IN_BYTES` for MV,
    `IS_ASCII` for STRING) may be missing from `ColumnMetadata`. The old code
    fell back to one of two ad-hoc scan paths inside the per-op methods, and
    the inv→fwd creator didn't backfill at all. After this change, the stats
    are gathered once and persisted to `metadata.properties` as a side effect
    of the rebuild, so subsequent reloads see complete metadata.
    
    ForwardIndexHandler:
    - `updateIndices` runs a per-column `backfillMissingLengthStatsForColumn`
      pre-pass before dispatching ops. The helper opens the forward-index
      reader, scans the column once, and persists the gathered stats. No-op
      for fixed-width columns, columns whose metadata already has the stats,
      or columns whose forward index is absent on disk.
    - The reader-driven scan in `rewriteForwardIndexForCompressionChange` and
      the metadata-driven scan in `rewriteDictToRawForwardIndex` are removed.
      Per-op methods just build an `IndexCreationContext` from `ColumnMetadata`
      — the pre-pass guarantees the lengths are populated.
    - The `IndexCreationContext.Builder` constructor reads stats from
      `ColumnMetadata`, so the explicit `withLengthOfLongestElement` /
      `withMaxRowLengthInBytes` overrides on the per-op builders go away.
    
    InvertedIndexAndDictionaryBasedForwardIndexCreator:
    - Constructor takes `FieldIndexConfigs` directly; the three derived
      booleans (`dictionaryEnabled`, `dictionaryBasedForwardIndex`) are gone.
    - `_dictionaryPresent` renamed to `_keepDictionary` with a sharper javadoc
      that names the post-rebuild "kept vs. dropped" semantic.
    - Both SV and MV paths gain inline tracking of per-element length stats
      from the dictionary when the source segment is missing them, so the
      backfill happens during the existing main loop with no extra pass over
      the column. The values flow into both the `IndexCreationContext.Builder`
      (so the new forward-index creator sees them) and the persisted
      `metadataProperties` map.
    - For MV var-length, `MAX_ROW_LENGTH_IN_BYTES` is always recomputed and
      persisted — in-row duplicates can drop during the rebuild and change
      the per-row max.
    
    SegmentMetadataUtils.updateMetadataProperties:
    - A `null` value now removes the property via `clearProperty` rather than
      writing the literal string. Callers can drop a key by mapping it to
      null.
    
    BaseIndexHandler.createForwardIndexIfNeeded:
    - Simplified to pass `_fieldIndexConfigs.get(columnName)` directly to the
      inv→fwd creator constructor instead of three derived locals.
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../creator/impl/stats/StatsCollectorUtil.java     |   9 +-
 .../segment/index/loader/BaseIndexHandler.java     |  11 +-
 .../segment/index/loader/ForwardIndexHandler.java  | 245 ++++++++++-----------
 ...IndexAndDictionaryBasedForwardIndexCreator.java | 148 +++++++++----
 .../index/loader/ForwardIndexHandlerTest.java      | 126 +++++++++++
 .../invertedindex/VectorIndexHandlerTest.java      |  44 ++--
 .../org/apache/pinot/segment/spi/V1Constants.java  |   4 +-
 .../spi/index/metadata/ColumnMetadataImpl.java     |  10 +-
 .../segment/spi/utils/SegmentMetadataUtils.java    |  10 +-
 9 files changed, 396 insertions(+), 211 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java
index 1a3b5e8bb66..43a6f8d0fec 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StatsCollectorUtil.java
@@ -43,12 +43,9 @@ public final class StatsCollectorUtil {
   public static AbstractColumnStatisticsCollector createStatsCollector(String 
columnName, FieldSpec fieldSpec,
       FieldIndexConfigs indexConfig, StatsCollectorConfig 
statsCollectorConfig) {
     boolean dictionaryEnabled = 
indexConfig.getConfig(StandardIndexes.dictionary()).isEnabled();
-    if (!dictionaryEnabled) {
-      // MAP collector is optimised for no-dictionary collection
-      if 
(!fieldSpec.getDataType().getStoredType().equals(FieldSpec.DataType.MAP)) {
-        if 
(ClusterConfigForTable.useOptimizedNoDictCollector(statsCollectorConfig.getTableConfig()))
 {
-          return new NoDictColumnStatisticsCollector(columnName, 
statsCollectorConfig);
-        }
+    if (!dictionaryEnabled && fieldSpec.getDataType().getStoredType() != 
FieldSpec.DataType.MAP) {
+      if 
(ClusterConfigForTable.useOptimizedNoDictCollector(statsCollectorConfig.getTableConfig()))
 {
+        return new NoDictColumnStatisticsCollector(columnName, 
statsCollectorConfig);
       }
     }
     switch (fieldSpec.getDataType().getStoredType()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
index a5c68d21c8f..dc488a1ee71 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
@@ -26,12 +26,10 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.IndexHandler;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
@@ -62,6 +60,8 @@ public abstract class BaseIndexHandler implements 
IndexHandler {
       TableConfig tableConfig, Schema schema) {
     _segmentDirectory = segmentDirectory;
     SegmentMetadataImpl segmentMetadata = 
segmentDirectory.getSegmentMetadata();
+    Preconditions.checkState(segmentMetadata.getTotalDocs() > 0, "Got empty 
segment: %s in index handler",
+        segmentMetadata.getName());
     if (fieldIndexConfigs.keySet().equals(segmentMetadata.getAllColumns())) {
       _fieldIndexConfigs = fieldIndexConfigs;
     } else {
@@ -110,14 +110,9 @@ public abstract class BaseIndexHandler implements 
IndexHandler {
 
     LOGGER.info("Rebuilding the forward index for column: {}, is temporary: 
{}", columnName, isTemporaryForwardIndex);
 
-    FieldIndexConfigs fieldIndexConfig = _fieldIndexConfigs.get(columnName);
-    boolean dictionaryEnabled = 
fieldIndexConfig.getConfig(StandardIndexes.dictionary()).isEnabled();
-    ForwardIndexConfig forwardIndexConfig = 
fieldIndexConfig.getConfig(StandardIndexes.forward());
-    boolean dictionaryBasedForwardIndex = forwardIndexConfig.getEncodingType() 
== FieldConfig.EncodingType.DICTIONARY;
-
     InvertedIndexAndDictionaryBasedForwardIndexCreator creator =
         new 
InvertedIndexAndDictionaryBasedForwardIndexCreator(_segmentDirectory, 
segmentWriter, _tableConfig,
-            columnName, dictionaryEnabled, dictionaryBasedForwardIndex, 
forwardIndexConfig, isTemporaryForwardIndex);
+            columnName, _fieldIndexConfigs.get(columnName), 
isTemporaryForwardIndex);
     creator.regenerateForwardIndex();
     // Validate that the forward index is created.
     if (!segmentWriter.hasIndexFor(columnName, StandardIndexes.forward())) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 0aba4f1a52e..e0f2e200fd0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.segment.index.loader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,7 +32,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
-import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.BigDecimalColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPreIndexStatsCollector;
@@ -68,7 +66,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
-import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.EncodingType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -144,6 +142,11 @@ public class ForwardIndexHandler extends BaseIndexHandler {
       String column = entry.getKey();
       List<Operation> operations = entry.getValue();
 
+      // Backfill missing 1.6.0-era stats for this column before dispatching 
ops so the per-op handlers can read
+      // them off `ColumnMetadata` when they construct their forward-index 
creator. No-op when the column is
+      // fixed-width, already has the stats, or has no forward index on disk.
+      backfillMissingStats(column, segmentWriter);
+
       for (Operation operation : operations) {
         switch (operation) {
           case DISABLE_FORWARD_INDEX:
@@ -196,7 +199,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
             break;
           case ENABLE_DICTIONARY:
             if 
(_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getEncodingType()
-                == FieldConfig.EncodingType.DICTIONARY) {
+                == EncodingType.DICTIONARY) {
               createDictBasedForwardIndex(column, segmentWriter);
             } else {
               createDictionaryForRawForwardIndex(column, segmentWriter);
@@ -218,19 +221,20 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
   @VisibleForTesting
   Map<String, List<Operation>> computeOperations(SegmentDirectory.Reader 
segmentReader)
       throws Exception {
-    Map<String, List<Operation>> columnOperationsMap = new HashMap<>();
+    SegmentMetadataImpl segmentMetadata = 
_segmentDirectory.getSegmentMetadata();
 
     // Does not work for segment versions < V3.
-    if 
(_segmentDirectory.getSegmentMetadata().getVersion().compareTo(SegmentVersion.v3)
 < 0) {
-      return columnOperationsMap;
+    if (segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
+      return Map.of();
     }
 
-    Set<String> existingAllColumns = 
_segmentDirectory.getSegmentMetadata().getSchema().getPhysicalColumnNames();
+    Map<String, List<Operation>> columnOperationsMap = new HashMap<>();
+    Set<String> existingAllColumns = 
segmentMetadata.getSchema().getPhysicalColumnNames();
     Set<String> existingDictColumns = 
_segmentDirectory.getColumnsWithIndex(StandardIndexes.dictionary());
     Set<String> existingForwardIndexColumns = 
_segmentDirectory.getColumnsWithIndex(StandardIndexes.forward());
     Set<String> existingInvertedIndexColumns =
         
segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.inverted());
-    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+    String segmentName = segmentMetadata.getName();
 
     for (String column : existingAllColumns) {
       if (!_schema.hasColumn(column)) {
@@ -246,14 +250,15 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
 
       // Forward-index encoding is the source-of-truth for "does forward exist 
and how is it laid out". Three
       // states: DICTIONARY (dict-encoded forward), RAW (raw forward), null 
(forward disabled / not on disk).
-      FieldConfig.EncodingType existingFwdEncoding = 
existingForwardIndexColumns.contains(column)
-          ? 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column).getForwardIndexEncoding()
 : null;
+      EncodingType existingFwdEncoding = 
existingForwardIndexColumns.contains(column)
+          ? 
segmentMetadata.getColumnMetadataFor(column).getForwardIndexEncoding()
+          : null;
       ForwardIndexConfig newFwdConf = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-      FieldConfig.EncodingType newFwdEncoding = newFwdConf.isEnabled() ? 
newFwdConf.getEncodingType() : null;
+      EncodingType newFwdEncoding = newFwdConf.isEnabled() ? 
newFwdConf.getEncodingType() : null;
 
-      List<Operation> ops = computeColumnOperations(column, fieldSpec, 
segmentReader,
-          existingDictColumns.contains(column), existingFwdEncoding, 
newFwdEncoding,
-          existingInvertedIndexColumns.contains(column), segmentName);
+      List<Operation> ops =
+          computeColumnOperations(column, fieldSpec, segmentReader, 
existingDictColumns.contains(column),
+              existingFwdEncoding, newFwdEncoding, 
existingInvertedIndexColumns.contains(column), segmentName);
       if (!ops.isEmpty()) {
         columnOperationsMap.put(column, ops);
       }
@@ -280,7 +285,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   ///    forward to be on so the dict can be bootstrapped.
   private List<Operation> computeColumnOperations(String column, FieldSpec 
fieldSpec,
       SegmentDirectory.Reader segmentReader, boolean existingHasDict,
-      @Nullable FieldConfig.EncodingType existingFwdEncoding, @Nullable 
FieldConfig.EncodingType newFwdEncoding,
+      @Nullable EncodingType existingFwdEncoding, @Nullable EncodingType 
newFwdEncoding,
       boolean existingHasInverted, String segmentName)
       throws Exception {
     FieldIndexConfigs newConf = _fieldIndexConfigs.get(column);
@@ -326,8 +331,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
               existingHasInverted ? "enabled" : "disabled");
           return ops;
         }
-        ops.add(newFwdEncoding == FieldConfig.EncodingType.RAW
-            ? Operation.ENABLE_RAW_FORWARD_INDEX : 
Operation.ENABLE_DICT_FORWARD_INDEX);
+        ops.add(newFwdEncoding == EncodingType.RAW
+            ? Operation.ENABLE_RAW_FORWARD_INDEX
+            : Operation.ENABLE_DICT_FORWARD_INDEX);
         return ops;
       }
     } else if (existingHasFwd && existingFwdEncoding != newFwdEncoding && 
existingHasDict == desiredDict) {
@@ -343,8 +349,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
             + "ignoring", column, segmentName);
         return ops;
       }
-      ops.add(newFwdEncoding == FieldConfig.EncodingType.RAW
-          ? Operation.ENABLE_RAW_FORWARD_INDEX : 
Operation.ENABLE_DICT_FORWARD_INDEX);
+      ops.add(newFwdEncoding == EncodingType.RAW
+          ? Operation.ENABLE_RAW_FORWARD_INDEX
+          : Operation.ENABLE_DICT_FORWARD_INDEX);
     }
 
     // 2. Dictionary transition.
@@ -382,7 +389,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     // 3. Compression-type change (only when no encoding change happened).
     if (ops.isEmpty() && existingFwdEncoding != null && existingFwdEncoding == 
newFwdEncoding
         && existingHasDict == desiredDict) {
-      if (existingFwdEncoding == FieldConfig.EncodingType.RAW) {
+      if (existingFwdEncoding == EncodingType.RAW) {
         // TODO: Also check if raw index version needs to be changed
         if (shouldChangeRawCompressionType(column, segmentReader)) {
           ops.add(Operation.CHANGE_INDEX_COMPRESSION_TYPE);
@@ -425,10 +432,11 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
     LOGGER.info("{}: Updating metadata properties for segment={} and 
column={}", reason, segmentName, column);
     Map<String, String> metadataProperties = new HashMap<>();
     metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(false));
-    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
FieldConfig.EncodingType.RAW.name());
+    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
EncodingType.RAW.name());
     metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), 
String.valueOf(0));
     // TODO: See https://github.com/apache/pinot/pull/16921 for details
-    // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), 
String.valueOf(-1));
+    // TODO: Remove the property after 1.6.0 release
+    // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), null);
     SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, 
metadataProperties);
 
     // Remove the inverted index, FST index and range index
@@ -439,7 +447,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
 
   private boolean isForwardIndexDictionaryEncoded(String column) {
     return 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column).getForwardIndexEncoding()
-        == FieldConfig.EncodingType.DICTIONARY;
+        == EncodingType.DICTIONARY;
   }
 
   /// Returns {@code true} when removing the dictionary on this column would 
leave another enabled index
@@ -532,8 +540,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
 
   private void rewriteForwardIndexForCompressionChange(String column, 
SegmentDirectory.Writer segmentWriter)
       throws Exception {
-    ColumnMetadata existingColMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
-    boolean isSingleValue = existingColMetadata.isSingleValue();
+    ColumnMetadata columnMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+    boolean isSingleValue = columnMetadata.isSingleValue();
     boolean isDictionaryEncodedForwardIndex = 
isForwardIndexDictionaryEncoded(column);
 
     File indexDir = _segmentDirectory.getSegmentMetadata().getIndexDir();
@@ -562,7 +570,24 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     }
 
     LOGGER.info("Creating new forward index for segment={} and column={}", 
segmentName, column);
-    rewriteForwardIndexForCompressionChange(column, existingColMetadata, 
indexDir, segmentWriter);
+    // The encoding does not change during a compression-change rewrite, so 
`_fieldIndexConfigs` already carries
+    // the correct encoding for the new forward index. The 
`IndexCreationContext.Builder` constructor pulls
+    // length stats from `ColumnMetadata`; `updateIndices` has already 
backfilled them for this column.
+    IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
+    try (ForwardIndexReader<?> reader = 
readerFactory.createIndexReader(segmentWriter, _fieldIndexConfigs.get(column),
+        columnMetadata)) {
+      IndexCreationContext context = new 
IndexCreationContext.Builder(indexDir, _tableConfig, columnMetadata).build();
+      ForwardIndexConfig config = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
+      try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(context, config)) {
+        if (!reader.getStoredType().equals(creator.getValueType())) {
+          // Creator stored type should match reader stored type for raw 
columns. We do not support changing datatypes.
+          throw new UnsupportedOperationException(
+              "Unsupported operation to change datatype for column=" + column 
+ " from " + reader.getStoredType()
+                  + " to " + creator.getValueType());
+        }
+        forwardIndexRewriteHelper(column, columnMetadata, reader, creator, 
columnMetadata.getTotalDocs(), null, null);
+      }
+    }
 
     // We used the existing forward index to generate a new forward index. The 
existing forward index will be in V3
     // format and the new forward index will be in V1 format. Remove the 
existing forward index as it is not needed
@@ -578,57 +603,6 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     LOGGER.info("Created forward index for segment: {}, column: {}", 
segmentName, column);
   }
 
-  private void rewriteForwardIndexForCompressionChange(String column, 
ColumnMetadata columnMetadata, File indexDir,
-      SegmentDirectory.Writer segmentWriter)
-      throws Exception {
-    // Get the forward index reader factory and create a reader
-    IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
-    try (ForwardIndexReader<?> reader = 
readerFactory.createIndexReader(segmentWriter, _fieldIndexConfigs.get(column),
-        columnMetadata)) {
-      IndexCreationContext.Builder builder = new 
IndexCreationContext.Builder(indexDir, _tableConfig, columnMetadata);
-      // Encoding flows through ForwardIndexConfig; for compression-change 
rewrite the encoding does not change so
-      // the config in _fieldIndexConfigs already carries the correct encoding.
-      // Set entry length info for raw index creators. No need to set this 
when changing dictionary id compression type.
-      if (!reader.isDictionaryEncoded() && 
!columnMetadata.getDataType().getStoredType().isFixedWidth()) {
-        int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-        if (lengthOfLongestEntry < 0) {
-          // When this info is not available from the reader, we need to scan 
the column.
-          lengthOfLongestEntry = getMaxRowLength(columnMetadata, reader, null);
-        }
-        if (columnMetadata.isSingleValue()) {
-          builder.withLengthOfLongestElement(lengthOfLongestEntry);
-        } else {
-          // For VarByte MV columns like String and Bytes, the storage 
representation of each row contains the following
-          // components:
-          // 1. bytes required to store the actual elements of the MV row (A)
-          // 2. bytes required to store the number of elements in the MV row 
(B)
-          // 3. bytes required to store the length of each MV element (C)
-          //
-          // lengthOfLongestEntry = A + B + C
-          // maxRowLengthInBytes = A
-          int maxNumValuesPerEntry = 
columnMetadata.getMaxNumberOfMultiValues();
-          int maxRowLengthInBytes =
-              
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
 maxNumValuesPerEntry);
-          builder.withMaxRowLengthInBytes(maxRowLengthInBytes);
-        }
-      }
-      ForwardIndexConfig config = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-      IndexCreationContext context = builder.build();
-      try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(context, config)) {
-        if (!reader.getStoredType().equals(creator.getValueType())) {
-          // Creator stored type should match reader stored type for raw 
columns. We do not support changing datatypes.
-          String failureMsg =
-              "Unsupported operation to change datatype for column=" + column 
+ " from " + reader.getStoredType()
-                  .toString() + " to " + creator.getValueType().toString();
-          throw new UnsupportedOperationException(failureMsg);
-        }
-
-        int numDocs = columnMetadata.getTotalDocs();
-        forwardIndexRewriteHelper(column, columnMetadata, reader, creator, 
numDocs, null, null);
-      }
-    }
-  }
-
   private void forwardIndexRewriteHelper(String column, ColumnMetadata 
existingColumnMetadata,
       ForwardIndexReader<?> reader, ForwardIndexCreator creator, int numDocs,
       @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable 
Dictionary dictionaryReader) {
@@ -999,7 +973,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     LOGGER.info("Created forwardIndex. Updating metadata properties for 
segment={} and column={}", segmentName, column);
     Map<String, String> metadataProperties = new HashMap<>();
     metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(true));
-    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
FieldConfig.EncodingType.DICTIONARY.name());
+    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
EncodingType.DICTIONARY.name());
     metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE),
         String.valueOf(dictionaryCreator.getNumBytesPerEntry()));
     // If realtime segments were completed when the column was RAW, the 
cardinality value is populated as Integer
@@ -1074,7 +1048,7 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
         "Dictionary cardinality for column %s must be > 0 to compute 
BITS_PER_ELEMENT", column);
     Map<String, String> metadataProperties = new HashMap<>();
     metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(true));
-    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
FieldConfig.EncodingType.RAW.name());
+    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
EncodingType.RAW.name());
     metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), 
String.valueOf(dictionaryElementSize));
     metadataProperties.put(getKeyFor(column, CARDINALITY), 
String.valueOf(dictionaryCardinality));
     metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT),
@@ -1134,10 +1108,11 @@ public class ForwardIndexHandler extends 
BaseIndexHandler {
         column);
     Map<String, String> metadataProperties = new HashMap<>();
     metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), 
String.valueOf(false));
-    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
FieldConfig.EncodingType.RAW.name());
+    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
EncodingType.RAW.name());
     metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), 
String.valueOf(0));
     // TODO: See https://github.com/apache/pinot/pull/16921 for details
-    // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), 
String.valueOf(-1));
+    // TODO: Remove the property after 1.6.0 release
+    // metadataProperties.put(getKeyFor(column, BITS_PER_ELEMENT), null);
     SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, 
metadataProperties);
 
     // Remove range index, inverted index and FST index.
@@ -1183,7 +1158,7 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
     LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, 
StandardIndexes.forward());
 
     Map<String, String> metadataProperties = new HashMap<>();
-    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
FieldConfig.EncodingType.RAW.name());
+    metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING), 
EncodingType.RAW.name());
     SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, 
metadataProperties);
 
     FileUtils.deleteQuietly(inProgress);
@@ -1194,56 +1169,76 @@ public class ForwardIndexHandler extends 
BaseIndexHandler {
       File indexDir)
       throws Exception {
     String column = columnMetadata.getColumnName();
-    // Get the forward index reader factory and create a reader
-    IndexReaderFactory<ForwardIndexReader> readerFactory = 
StandardIndexes.forward().getReaderFactory();
-    try (ForwardIndexReader<?> reader = 
readerFactory.createIndexReader(segmentWriter, _fieldIndexConfigs.get(column),
-        columnMetadata)) {
-      Dictionary dictionary = DictionaryIndexType.read(segmentWriter, 
columnMetadata);
-      IndexCreationContext.Builder builder =
-          new IndexCreationContext.Builder(indexDir, _tableConfig, 
columnMetadata).withDictionary(false);
-      // Encoding flows through ForwardIndexConfig set below. The row length 
is derived from persisted metadata via
-      // the Builder constructor; only fall back to scanning when metadata 
returns UNAVAILABLE (var-width MV columns
-      // with varying element lengths).
-      if (columnMetadata.getMaxRowLengthInBytes() == 
ColumnMetadata.UNAVAILABLE) {
-        builder.withMaxRowLengthInBytes(getMaxRowLength(columnMetadata, 
reader, dictionary));
-      }
-      ForwardIndexConfig config = 
_fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-      try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(builder.build(), config)) {
-        forwardIndexRewriteHelper(column, columnMetadata, reader, creator, 
columnMetadata.getTotalDocs(), null,
+    FieldIndexConfigs indexConfigs = _fieldIndexConfigs.get(column);
+    try (ForwardIndexReader<?> forwardIndex = 
StandardIndexes.forward().getReaderFactory()
+        .createIndexReader(segmentWriter, indexConfigs, columnMetadata);
+        Dictionary dictionary = DictionaryIndexType.read(segmentWriter, 
columnMetadata)) {
+      IndexCreationContext context = new 
IndexCreationContext.Builder(indexDir, _tableConfig, columnMetadata).build();
+      ForwardIndexConfig config = 
indexConfigs.getConfig(StandardIndexes.forward());
+      try (ForwardIndexCreator creator = 
StandardIndexes.forward().createIndexCreator(context, config)) {
+        forwardIndexRewriteHelper(column, columnMetadata, forwardIndex, 
creator, columnMetadata.getTotalDocs(), null,
             dictionary);
       }
     }
   }
 
-  /**
-   * Returns the max row length for a column.
-   * - For SV column, this is the length of the longest value.
-   * - For MV column, this is the length of the longest MV entry (sum of 
lengths of all elements).
-   */
-  private int getMaxRowLength(ColumnMetadata columnMetadata, 
ForwardIndexReader<?> forwardIndex,
-      @Nullable Dictionary dictionary)
-      throws IOException {
-    String column = columnMetadata.getColumnName();
+  /// Scans the column and persists the 1.6.0-era stats — 
`lengthOfShortestElement`, `lengthOfLongestElement`,
+  /// `isAscii` (STRING) and `maxRowLengthInBytes` (MV) — when they are 
missing from segment metadata. After this
+  /// returns, per-op handlers can read those stats off [ColumnMetadata] when 
they construct their forward-index
+  /// creator.
+  ///
+  /// The trigger probes the most-recently-added stat for the column's shape — 
`lengthOfShortestElement` for SV,
+  /// `maxRowLengthInBytes` for MV — because its presence implies the older 
1.6.0 stats are also present (stats are
+  /// persisted as a set on the writer side, so older keys never appear 
without the newest). The backfill is a
+  /// no-op when:
+  /// - the column is fixed-width (length stats derive from 
`storedType.size()`);
+  /// - the trigger stat is already present in metadata;
+  /// - the column has no forward index on disk (those are rebuilt via
+  ///   [InvertedIndexAndDictionaryBasedForwardIndexCreator], which handles 
its own backfill).
+  private void backfillMissingStats(String column, SegmentDirectory.Writer 
segmentWriter)
+      throws Exception {
+    ColumnMetadata columnMetadata = 
_segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     DataType storedType = columnMetadata.getDataType().getStoredType();
-    assert !storedType.isFixedWidth();
-
-    try (PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(forwardIndex, dictionary, null,
-        columnMetadata.getMaxNumberOfMultiValues())) {
-      AbstractColumnStatisticsCollector statsCollector = 
getStatsCollector(column, storedType);
-      int numDocs = columnMetadata.getTotalDocs();
-      for (int i = 0; i < numDocs; i++) {
-        statsCollector.collect(columnReader.getValue(i));
+    boolean singleValue = columnMetadata.isSingleValue();
+    if (storedType.isFixedWidth()
+        || (singleValue && columnMetadata.getLengthOfShortestElement() >= 0)
+        || (!singleValue && columnMetadata.getMaxRowLengthInBytes() >= 0)) {
+      return;
+    }
+    try (ForwardIndexReader<?> forwardIndex = 
StandardIndexes.forward().getReaderFactory()
+        .createIndexReader(segmentWriter, _fieldIndexConfigs.get(column), 
columnMetadata)) {
+      if (forwardIndex == null) {
+        return;
+      }
+      boolean dictionaryEncoded = forwardIndex.isDictionaryEncoded();
+      try (Dictionary dictionary = dictionaryEncoded ? 
DictionaryIndexType.read(segmentWriter, columnMetadata) : null;
+          PinotSegmentColumnReader columnReader = new 
PinotSegmentColumnReader(forwardIndex, dictionary, null,
+              columnMetadata.getMaxNumberOfMultiValues())) {
+        AbstractColumnStatisticsCollector statsCollector = 
getStatsCollector(column, storedType, false);
+        int numDocs = columnMetadata.getTotalDocs();
+        assert numDocs > 0;
+        for (int i = 0; i < numDocs; i++) {
+          statsCollector.collect(columnReader.getValue(i));
+        }
+        Map<String, String> metadataProperties = new HashMap<>();
+        metadataProperties.put(getKeyFor(column, FORWARD_INDEX_ENCODING),
+            dictionaryEncoded ? EncodingType.DICTIONARY.name() : 
EncodingType.RAW.name());
+        metadataProperties.put(getKeyFor(column, LENGTH_OF_SHORTEST_ELEMENT),
+            String.valueOf(statsCollector.getLengthOfShortestElement()));
+        metadataProperties.put(getKeyFor(column, LENGTH_OF_LONGEST_ELEMENT),
+            String.valueOf(statsCollector.getLengthOfLongestElement()));
+        if (storedType == DataType.STRING) {
+          metadataProperties.put(getKeyFor(column, IS_ASCII), 
String.valueOf(statsCollector.isAscii()));
+        }
+        if (!singleValue) {
+          metadataProperties.put(getKeyFor(column, MAX_ROW_LENGTH_IN_BYTES),
+              String.valueOf(statsCollector.getMaxRowLengthInBytes()));
+        }
+        SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, 
metadataProperties);
       }
-      // NOTE: No need to seal the stats collector because value length is 
updated while collecting stats.
-      return columnMetadata.isSingleValue() ? 
statsCollector.getLengthOfLongestElement()
-          : statsCollector.getMaxRowLengthInBytes();
     }
   }
 
-  private AbstractColumnStatisticsCollector getStatsCollector(String column, 
DataType storedType) {
-    return getStatsCollector(column, storedType, false);
-  }
-
   /// `requireUniqueValues=true` forces a per-type collector even when the 
no-dict optimization would apply.
   /// Callers building a dictionary out of the raw values must opt in — 
`_fieldIndexConfigs` may still report
   /// `dictionary=disabled` when the dictionary requirement was derived from a 
secondary-index need rather
@@ -1252,9 +1247,7 @@ public class ForwardIndexHandler extends BaseIndexHandler 
{
   private AbstractColumnStatisticsCollector getStatsCollector(String column, 
DataType storedType,
       boolean requireUniqueValues) {
     StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
-    boolean dictionaryEnabled = hasIndex(column, StandardIndexes.dictionary());
-    // MAP collector is optimised for no-dictionary collection.
-    if (!requireUniqueValues && !dictionaryEnabled && storedType != 
DataType.MAP) {
+    if (!requireUniqueValues && storedType != DataType.MAP) {
       if (ClusterConfigForTable.useOptimizedNoDictCollector(_tableConfig)) {
         return new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java
index 9ffc34b5089..521d4e822c9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java
@@ -32,6 +32,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.IndexHandler;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -40,6 +41,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
+import org.apache.pinot.spi.config.table.FieldConfig.EncodingType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.roaringbitmap.PeekableIntIterator;
@@ -82,10 +84,10 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
   private final SegmentDirectory.Writer _segmentWriter;
   private final TableConfig _tableConfig;
   private final String _columnName;
-  /// `true` if a standalone dictionary file exists for the column. 
Independent of how the forward index is
-  /// encoded — both the standard "dict-encoded forward index" and the new 
"shared-dictionary + raw forward index"
-  /// configurations have `_dictionaryPresent == true`.
-  private final boolean _dictionaryPresent;
+  /// `true` when the post-rebuild config keeps a standalone dictionary file 
for the column. Set to `false` only
+  /// when the dictionary is being dropped as part of the rebuild — 
independent of whether the new forward index
+  /// is dict-encoded or raw, since a raw forward index can also share an 
existing dictionary.
+  private final boolean _keepDictionary;
   private final ForwardIndexConfig _forwardIndexConfig;
   private final boolean _isTemporaryForwardIndex;
 
@@ -116,32 +118,32 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
   private PinotDataBuffer _forwardIndexMaxSizeBuffer;
 
   public InvertedIndexAndDictionaryBasedForwardIndexCreator(SegmentDirectory 
segmentDirectory,
-      SegmentDirectory.Writer segmentWriter, TableConfig tableConfig, String 
columnName, boolean dictionaryPresent,
-      boolean dictionaryBasedForwardIndex, ForwardIndexConfig 
forwardIndexConfig, boolean isTemporaryForwardIndex)
+      SegmentDirectory.Writer segmentWriter, TableConfig tableConfig, String 
columnName,
+      FieldIndexConfigs fieldIndexConfigs, boolean isTemporaryForwardIndex)
       throws IOException {
     _segmentDirectory = segmentDirectory;
     _segmentWriter = segmentWriter;
     _tableConfig = tableConfig;
     _columnName = columnName;
-    _dictionaryPresent = dictionaryPresent;
-    _forwardIndexConfig = forwardIndexConfig;
+    _keepDictionary = 
fieldIndexConfigs.getConfig(StandardIndexes.dictionary()).isEnabled();
+    _forwardIndexConfig = 
fieldIndexConfigs.getConfig(StandardIndexes.forward());
     _isTemporaryForwardIndex = isTemporaryForwardIndex;
 
     _segmentMetadata = segmentDirectory.getSegmentMetadata();
     _columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
     _singleValue = _columnMetadata.isSingleValue();
-    _cardinality = _columnMetadata.getCardinality();
     _numDocs = _columnMetadata.getTotalDocs();
+    _cardinality = _columnMetadata.getCardinality();
+    assert _numDocs > 0 && _cardinality > 0;
     _totalNumberOfEntries = _columnMetadata.getTotalNumberOfEntries();
     _maxNumberOfMultiValues = _columnMetadata.getMaxNumberOfMultiValues();
-    int numValues = _singleValue ? _numDocs : _totalNumberOfEntries;
-    _useMMapBuffer = numValues > NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
+    _useMMapBuffer = _totalNumberOfEntries > 
NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER;
 
     // Sorted columns should never need recreation of the forward index as the 
forwardIndexDisabled flag is treated as
     // a no-op for sorted columns
     File indexDir = _segmentMetadata.getIndexDir();
     String fileExtension;
-    if (dictionaryBasedForwardIndex) {
+    if (_forwardIndexConfig.getEncodingType() == EncodingType.DICTIONARY) {
       fileExtension = _singleValue ? 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
           : V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
     } else {
@@ -155,7 +157,8 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
 
     // Create the temporary buffers needed
     try {
-      _forwardIndexValueBuffer = createTempBuffer((long) numValues * 
Integer.BYTES, _forwardIndexValueBufferFile);
+      _forwardIndexValueBuffer =
+          createTempBuffer((long) _totalNumberOfEntries * Integer.BYTES, 
_forwardIndexValueBufferFile);
       if (!_singleValue) {
         _forwardIndexLengthBuffer = createTempBuffer((long) _numDocs * 
Integer.BYTES, _forwardIndexLengthBufferFile);
         for (int i = 0; i < _numDocs; i++) {
@@ -230,7 +233,7 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
       // Only cleanup the other indexes if the forward index to be created is 
permanent. If the forward index is
       // temporary, it is meant to be used only for construction of other 
indexes and will be deleted once all the
       // IndexHandlers have completed.
-      if (!_dictionaryPresent) {
+      if (!_keepDictionary) {
         LOGGER.info("Clean up indexes no longer needed or which need to be 
rewritten for segment: {}, column: {}",
             segmentName, _columnName);
         // Delete the dictionary
@@ -255,33 +258,61 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
         (BitmapInvertedIndexReader) InvertedIndexType.ReaderFactory
             .INSTANCE.createSkippingForward(_segmentWriter, _columnMetadata);
         Dictionary dictionary = DictionaryIndexType.read(_segmentWriter, 
_columnMetadata)) {
-      // Construct the forward index in the values buffer
+      // Construct the forward index in the values buffer. For var-length 
columns, also gather per-element stats
+      // (lengthOfShortest/Longest, isAscii for STRING) inline when the source 
segment is missing them, so the
+      // backfill happens without a second dictionary scan.
+      DataType storedType = _columnMetadata.getStoredType();
+      boolean backfillStats =
+          !storedType.isFixedWidth() && 
_columnMetadata.getLengthOfShortestElement() < 0;
+      int lengthOfShortestElement = Integer.MAX_VALUE;
+      int lengthOfLongestElement = 0;
+      boolean isAscii = storedType == DataType.STRING;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
         ImmutableRoaringBitmap docIdsBitmap = 
invertedIndexReader.getDocIds(dictId);
         int finalDictId = dictId;
         docIdsBitmap.stream().forEach(docId -> 
putInt(_forwardIndexValueBuffer, docId, finalDictId));
+        if (backfillStats) {
+          int valueSize = dictionary.getValueSize(dictId);
+          lengthOfShortestElement = Math.min(lengthOfShortestElement, 
valueSize);
+          lengthOfLongestElement = Math.max(lengthOfLongestElement, valueSize);
+          if (isAscii) {
+            isAscii = valueSize == dictionary.getStringValue(dictId).length();
+          }
+        }
       }
 
-      IndexCreationContext context =
-          new IndexCreationContext.Builder(_segmentMetadata.getIndexDir(), 
_tableConfig, _columnMetadata)
-              .withDictionary(_dictionaryPresent)
-              .build();
+      IndexCreationContext.Builder builder =
+          new IndexCreationContext.Builder(_segmentMetadata.getIndexDir(), 
_tableConfig, _columnMetadata);
+      if (backfillStats) {
+        builder.withLengthOfShortestElement(lengthOfShortestElement);
+        builder.withLengthOfLongestElement(lengthOfLongestElement);
+        builder.withAscii(isAscii);
+      }
 
-      // note: this method closes buffers and removes files
-      writeToForwardIndex(dictionary, context);
+      // NOTE: this method closes buffers and removes files
+      writeToForwardIndex(dictionary, builder.build());
 
       // Setup and return the metadata properties to update
-      if (_dictionaryPresent) {
-        return Map.of(getKeyFor(_columnName, FORWARD_INDEX_ENCODING),
-            _forwardIndexConfig.getEncodingType().name());
+      Map<String, String> metadataProperties = new HashMap<>();
+      metadataProperties.put(getKeyFor(_columnName, FORWARD_INDEX_ENCODING),
+          _forwardIndexConfig.getEncodingType().name());
+      if (!_keepDictionary) {
+        metadataProperties.put(getKeyFor(_columnName, HAS_DICTIONARY), 
String.valueOf(false));
+        metadataProperties.put(getKeyFor(_columnName, 
DICTIONARY_ELEMENT_SIZE), String.valueOf(0));
+        // TODO: See https://github.com/apache/pinot/pull/16921 for details
+        // TODO: Remove the property after 1.6.0 release
+        // metadataProperties.put(getKeyFor(_columnName, BITS_PER_ELEMENT), 
null);
       }
-      return Map.of(
-          getKeyFor(_columnName, HAS_DICTIONARY), String.valueOf(false),
-          getKeyFor(_columnName, FORWARD_INDEX_ENCODING), 
_forwardIndexConfig.getEncodingType().name(),
-          getKeyFor(_columnName, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)
-          // TODO: See https://github.com/apache/pinot/pull/16921 for details
-          // getKeyFor(_columnName, BITS_PER_ELEMENT), String.valueOf(-1)
-      );
+      if (backfillStats) {
+        metadataProperties.put(getKeyFor(_columnName, 
LENGTH_OF_SHORTEST_ELEMENT),
+            String.valueOf(lengthOfShortestElement));
+        metadataProperties.put(getKeyFor(_columnName, 
LENGTH_OF_LONGEST_ELEMENT),
+            String.valueOf(lengthOfLongestElement));
+        if (storedType == DataType.STRING) {
+          metadataProperties.put(getKeyFor(_columnName, IS_ASCII), 
String.valueOf(isAscii));
+        }
+      }
+      return metadataProperties;
     }
   }
 
@@ -329,13 +360,28 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
         forwardValueIndex += length;
       }
 
-      // Construct the forward index values buffer from the inverted index 
using the length buffer for index tracking
+      // Construct the forward index values buffer from the inverted index 
using the length buffer for index tracking.
+      // For var-length columns, also tracks per-element stats 
(lengthOfShortest/Longest, isAscii for STRING) when
+      // the source segment is missing them, so we can backfill those metadata 
keys without a second dictionary scan.
       DataType storedType = _columnMetadata.getStoredType();
       boolean isFixedWidth = storedType.isFixedWidth();
-      int maxRowLengthInBytes = isFixedWidth ? maxNumberOfMultiValues * 
storedType.size() : 0;
+      int fixedSize = isFixedWidth ? storedType.size() : 0;
+      int maxRowLengthInBytes = isFixedWidth ? maxNumberOfMultiValues * 
fixedSize : 0;
+      boolean backfillStats = !isFixedWidth && 
_columnMetadata.getLengthOfShortestElement() < 0;
+      int lengthOfShortestElement = Integer.MAX_VALUE;
+      int lengthOfLongestElement = 0;
+      boolean isAscii = storedType == DataType.STRING;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
         ImmutableRoaringBitmap docIdsBitmap = 
invertedIndexReader.getDocIds(dictId);
         PeekableIntIterator intIterator = docIdsBitmap.getIntIterator();
+        int valueSize = isFixedWidth ? fixedSize : 
dictionary.getValueSize(dictId);
+        if (backfillStats) {
+          lengthOfShortestElement = Math.min(lengthOfShortestElement, 
valueSize);
+          lengthOfLongestElement = Math.max(lengthOfLongestElement, valueSize);
+          if (isAscii) {
+            isAscii = valueSize == dictionary.getStringValue(dictId).length();
+          }
+        }
         while (intIterator.hasNext()) {
           int docId = intIterator.next();
           int index = getInt(_forwardIndexLengthBuffer, docId);
@@ -343,7 +389,7 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
           putInt(_forwardIndexLengthBuffer, docId, index + 1);
           if (!isFixedWidth) {
             int currentRowLength = getInt(_forwardIndexMaxSizeBuffer, docId);
-            int newRowLength = currentRowLength + 
dictionary.getValueSize(dictId);
+            int newRowLength = currentRowLength + valueSize;
             putInt(_forwardIndexMaxSizeBuffer, docId, newRowLength);
             maxRowLengthInBytes = Math.max(maxRowLengthInBytes, newRowLength);
           }
@@ -354,26 +400,42 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
       // post-dedup values and differ from the source metadata; override so 
the forward index creator sees the correct
       // sizes (consistent with the persisted metadata properties below).
       File indexDir = _segmentMetadata.getIndexDir();
-      IndexCreationContext context = new 
IndexCreationContext.Builder(indexDir, _tableConfig, _columnMetadata)
+      IndexCreationContext.Builder builder = new 
IndexCreationContext.Builder(indexDir, _tableConfig, _columnMetadata)
           .withTotalNumberOfEntries(_nextValueId)
           .withMaxNumberOfMultiValues(maxNumberOfMultiValues)
-          .withMaxRowLengthInBytes(maxRowLengthInBytes)
-          .withDictionary(_dictionaryPresent)
-          .build();
+          .withMaxRowLengthInBytes(maxRowLengthInBytes);
+      if (backfillStats) {
+        builder.withLengthOfShortestElement(lengthOfShortestElement);
+        builder.withLengthOfLongestElement(lengthOfLongestElement);
+        builder.withAscii(isAscii);
+      }
 
-      writeToForwardIndex(dictionary, context);
+      writeToForwardIndex(dictionary, builder.build());
 
       // Setup and return the metadata properties to update
       Map<String, String> metadataProperties = new HashMap<>();
-      metadataProperties.put(getKeyFor(_columnName, MAX_MULTI_VALUE_ELEMENTS), 
String.valueOf(maxNumberOfMultiValues));
-      metadataProperties.put(getKeyFor(_columnName, TOTAL_NUMBER_OF_ENTRIES), 
String.valueOf(_nextValueId));
       metadataProperties.put(getKeyFor(_columnName, FORWARD_INDEX_ENCODING),
           _forwardIndexConfig.getEncodingType().name());
-      if (!_dictionaryPresent) {
+      if (!_keepDictionary) {
         metadataProperties.put(getKeyFor(_columnName, HAS_DICTIONARY), 
String.valueOf(false));
         metadataProperties.put(getKeyFor(_columnName, 
DICTIONARY_ELEMENT_SIZE), String.valueOf(0));
         // TODO: See https://github.com/apache/pinot/pull/16921 for details
-        // metadataProperties.put(getKeyFor(_columnName, BITS_PER_ELEMENT), 
String.valueOf(-1));
+        // TODO: Remove the property after 1.6.0 release
+        // metadataProperties.put(getKeyFor(_columnName, BITS_PER_ELEMENT), 
null);
+      }
+      metadataProperties.put(getKeyFor(_columnName, TOTAL_NUMBER_OF_ENTRIES), 
String.valueOf(_nextValueId));
+      metadataProperties.put(getKeyFor(_columnName, MAX_MULTI_VALUE_ELEMENTS), 
String.valueOf(maxNumberOfMultiValues));
+      if (!isFixedWidth) {
+        metadataProperties.put(getKeyFor(_columnName, 
MAX_ROW_LENGTH_IN_BYTES), String.valueOf(maxRowLengthInBytes));
+        if (backfillStats) {
+          metadataProperties.put(getKeyFor(_columnName, 
LENGTH_OF_SHORTEST_ELEMENT),
+              String.valueOf(lengthOfShortestElement));
+          metadataProperties.put(getKeyFor(_columnName, 
LENGTH_OF_LONGEST_ELEMENT),
+              String.valueOf(lengthOfLongestElement));
+          if (storedType == DataType.STRING) {
+            metadataProperties.put(getKeyFor(_columnName, IS_ASCII), 
String.valueOf(isAscii));
+          }
+        }
       }
       return metadataProperties;
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index f9b03936f50..cf0d4175141 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -62,6 +62,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -2547,6 +2548,131 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  @Test
+  public void testBackfillMissingStats()
+      throws Exception {
+    // For each var-length column, strip the 1.6.0-era stats from metadata to 
simulate a pre-1.6.0 segment, then
+    // trigger a compression change. The pre-pass in 
`ForwardIndexHandler.updateIndices` should backfill the
+    // missing stats from a column scan before any per-op handler runs.
+    for (String column : List.of(DIM_SNAPPY_STRING, 
DIM_MV_PASS_THROUGH_STRING)) {
+      ColumnMetadata expected;
+      try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+          SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+        _segmentDirectory = segmentDirectory;
+        _writer = writer;
+        expected = 
segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+        boolean isSingleValue = expected.isSingleValue();
+
+        // Strip the 1.6.0-era stats: shortest/longest, isAscii (STRING only), 
and maxRowLengthInBytes (MV only).
+        Map<String, String> stripped = new HashMap<>();
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.LENGTH_OF_SHORTEST_ELEMENT), null);
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.LENGTH_OF_LONGEST_ELEMENT), null);
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.IS_ASCII), null);
+        if (!isSingleValue) {
+          stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+              V1Constants.MetadataKeys.Column.MAX_ROW_LENGTH_IN_BYTES), null);
+        }
+        SegmentMetadataUtils.updateMetadataProperties(segmentDirectory, 
stripped);
+        ColumnMetadata afterStrip = 
segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+        assertTrue(afterStrip.getLengthOfShortestElement() < 0);
+        if (!isSingleValue) {
+          assertTrue(afterStrip.getMaxRowLengthInBytes() < 0);
+        }
+
+        // Trigger a compression change to drive `updateIndices`, which runs 
the backfill pre-pass.
+        _fieldConfigMap.put(column,
+            new FieldConfig(column, FieldConfig.EncodingType.RAW, List.of(), 
CompressionCodec.LZ4, null));
+        updateIndices();
+      }
+
+      // Reopen and verify the stats were backfilled to the values the 
original (pre-strip) build produced.
+      ColumnMetadata actual = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(column);
+      assertEquals(actual.getLengthOfShortestElement(), 
expected.getLengthOfShortestElement());
+      assertEquals(actual.getLengthOfLongestElement(), 
expected.getLengthOfLongestElement());
+      assertEquals(actual.isAscii(), expected.isAscii());
+      assertEquals(actual.getMaxRowLengthInBytes(), 
expected.getMaxRowLengthInBytes());
+    }
+  }
+
+  @Test
+  public void testBackfillMissingStatsForMvViaMaxRowLengthTrigger()
+      throws Exception {
+    // The pre-pass uses `getMaxRowLengthInBytes() >= 0` (not 
`getLengthOfShortestElement() >= 0`) as the trigger
+    // for MV columns, because `MAX_ROW_LENGTH_IN_BYTES` was added after the 
other 1.6.0 keys. A segment that
+    // already has shortest/longest but is missing only 
`MAX_ROW_LENGTH_IN_BYTES` should still get backfilled.
+    String column = DIM_MV_PASS_THROUGH_STRING;
+    ColumnMetadata expected;
+    try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+        SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+      _segmentDirectory = segmentDirectory;
+      _writer = writer;
+      expected = 
segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+
+      // Strip only MAX_ROW_LENGTH_IN_BYTES; shortest/longest/isAscii stay.
+      Map<String, String> stripped = new HashMap<>();
+      stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+          V1Constants.MetadataKeys.Column.MAX_ROW_LENGTH_IN_BYTES), null);
+      SegmentMetadataUtils.updateMetadataProperties(segmentDirectory, 
stripped);
+      ColumnMetadata afterStrip = 
segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+      assertTrue(afterStrip.getMaxRowLengthInBytes() < 0);
+      assertTrue(afterStrip.getLengthOfShortestElement() >= 0);
+
+      _fieldConfigMap.put(column,
+          new FieldConfig(column, FieldConfig.EncodingType.RAW, List.of(), 
CompressionCodec.LZ4, null));
+      updateIndices();
+    }
+
+    ColumnMetadata actual = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(column);
+    assertEquals(actual.getMaxRowLengthInBytes(), 
expected.getMaxRowLengthInBytes());
+  }
+
+  @Test
+  public void testBackfillFromInvertedIndexRebuild()
+      throws Exception {
+    // For a forward-index-disabled var-length column, strip the 1.6.0-era 
stats and then re-enable the forward
+    // index. The InvertedIndexAndDictionaryBasedForwardIndexCreator rebuild 
path tracks per-element stats inline
+    // from the dictionary and persists them as part of the metadata update.
+    for (String column : List.of(DIM_SV_FORWARD_INDEX_DISABLED_STRING, 
DIM_MV_FORWARD_INDEX_DISABLED_STRING)) {
+      ColumnMetadata expected;
+      try (SegmentDirectory segmentDirectory = new 
SegmentLocalFSDirectory(INDEX_DIR, ReadMode.mmap);
+          SegmentDirectory.Writer writer = segmentDirectory.createWriter()) {
+        _segmentDirectory = segmentDirectory;
+        _writer = writer;
+        expected = 
segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+        boolean isSingleValue = expected.isSingleValue();
+
+        Map<String, String> stripped = new HashMap<>();
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.LENGTH_OF_SHORTEST_ELEMENT), null);
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.LENGTH_OF_LONGEST_ELEMENT), null);
+        stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+            V1Constants.MetadataKeys.Column.IS_ASCII), null);
+        if (!isSingleValue) {
+          stripped.put(V1Constants.MetadataKeys.Column.getKeyFor(column,
+              V1Constants.MetadataKeys.Column.MAX_ROW_LENGTH_IN_BYTES), null);
+        }
+        SegmentMetadataUtils.updateMetadataProperties(segmentDirectory, 
stripped);
+
+        // Drop FORWARD_INDEX_DISABLED from the field config so 
`updateIndices` runs ENABLE_DICT_FORWARD_INDEX,
+        // which rebuilds the forward index via 
`InvertedIndexAndDictionaryBasedForwardIndexCreator`.
+        _fieldConfigMap.remove(column);
+        updateIndices();
+      }
+
+      ColumnMetadata actual = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(column);
+      assertEquals(actual.getLengthOfShortestElement(), 
expected.getLengthOfShortestElement());
+      assertEquals(actual.getLengthOfLongestElement(), 
expected.getLengthOfLongestElement());
+      assertEquals(actual.isAscii(), expected.isAscii());
+      if (!expected.isSingleValue()) {
+        assertEquals(actual.getMaxRowLengthInBytes(), 
expected.getMaxRowLengthInBytes());
+      }
+    }
+  }
+
   private FieldIndexConfigs createFieldIndexConfigsFromMetadata(ColumnMetadata 
columnMetadata) {
     FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder();
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandlerTest.java
index 568fbcfe6f0..5826883d9db 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/VectorIndexHandlerTest.java
@@ -33,10 +33,13 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Unit tests for {@link VectorIndexHandler} backend-drift handling.
@@ -50,12 +53,12 @@ public class VectorIndexHandlerTest {
     File indexDir = 
createSegmentDirWithVectorIndex(V1Constants.Indexes.VECTOR_V912_HNSW_INDEX_FILE_EXTENSION);
     try {
       SegmentDirectory segmentDirectory = mockSegmentDirectory(indexDir);
-      SegmentDirectory.Reader reader = 
Mockito.mock(SegmentDirectory.Reader.class);
-      Mockito.when(reader.toSegmentDirectory()).thenReturn(segmentDirectory);
+      SegmentDirectory.Reader reader = mock(SegmentDirectory.Reader.class);
+      when(reader.toSegmentDirectory()).thenReturn(segmentDirectory);
 
       VectorIndexHandler handler = createHandler(segmentDirectory, 
vectorIndexConfig("IVF_PQ"));
 
-      Assert.assertTrue(handler.needUpdateIndices(reader));
+      assertTrue(handler.needUpdateIndices(reader));
     } finally {
       FileUtils.deleteQuietly(indexDir);
     }
@@ -67,13 +70,13 @@ public class VectorIndexHandlerTest {
     File indexDir = 
createSegmentDirWithVectorIndex(V1Constants.Indexes.VECTOR_V912_HNSW_INDEX_FILE_EXTENSION);
     try {
       SegmentDirectory segmentDirectory = mockSegmentDirectory(indexDir);
-      SegmentDirectory.Writer writer = 
Mockito.mock(SegmentDirectory.Writer.class);
-      Mockito.when(writer.toSegmentDirectory()).thenReturn(segmentDirectory);
+      SegmentDirectory.Writer writer = mock(SegmentDirectory.Writer.class);
+      when(writer.toSegmentDirectory()).thenReturn(segmentDirectory);
 
       VectorIndexHandler handler = createHandler(segmentDirectory, 
vectorIndexConfig("IVF_PQ"));
       handler.updateIndices(writer);
 
-      Mockito.verify(writer).removeIndex(COLUMN, StandardIndexes.vector());
+      verify(writer).removeIndex(COLUMN, StandardIndexes.vector());
     } finally {
       FileUtils.deleteQuietly(indexDir);
     }
@@ -83,20 +86,21 @@ public class VectorIndexHandlerTest {
       VectorIndexConfig vectorIndexConfig) {
     FieldIndexConfigs fieldIndexConfigs =
         new FieldIndexConfigs.Builder().add(StandardIndexes.vector(), 
vectorIndexConfig).build();
-    return new VectorIndexHandler(segmentDirectory, Map.of(COLUMN, 
fieldIndexConfigs), Mockito.mock(TableConfig.class),
-        Mockito.mock(Schema.class));
+    return new VectorIndexHandler(segmentDirectory, Map.of(COLUMN, 
fieldIndexConfigs), mock(TableConfig.class),
+        mock(Schema.class));
   }
 
   private static SegmentDirectory mockSegmentDirectory(File indexDir) {
-    SegmentMetadataImpl segmentMetadata = 
Mockito.mock(SegmentMetadataImpl.class);
-    Mockito.when(segmentMetadata.getName()).thenReturn("testSegment");
-    Mockito.when(segmentMetadata.getIndexDir()).thenReturn(indexDir);
-    Mockito.when(segmentMetadata.getAllColumns()).thenReturn(new 
TreeSet<>(Set.of(COLUMN)));
-    Mockito.when(segmentMetadata.getColumnMetadataMap()).thenReturn(new 
TreeMap<>());
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getName()).thenReturn("testSegment");
+    when(segmentMetadata.getIndexDir()).thenReturn(indexDir);
+    when(segmentMetadata.getTotalDocs()).thenReturn(10);
+    when(segmentMetadata.getAllColumns()).thenReturn(new 
TreeSet<>(Set.of(COLUMN)));
+    when(segmentMetadata.getColumnMetadataMap()).thenReturn(new TreeMap<>());
 
-    SegmentDirectory segmentDirectory = Mockito.mock(SegmentDirectory.class);
-    
Mockito.when(segmentDirectory.getSegmentMetadata()).thenReturn(segmentMetadata);
-    
Mockito.when(segmentDirectory.getColumnsWithIndex(StandardIndexes.vector())).thenReturn(Set.of(COLUMN));
+    SegmentDirectory segmentDirectory = mock(SegmentDirectory.class);
+    when(segmentDirectory.getSegmentMetadata()).thenReturn(segmentMetadata);
+    
when(segmentDirectory.getColumnsWithIndex(StandardIndexes.vector())).thenReturn(Set.of(COLUMN));
     return segmentDirectory;
   }
 
@@ -109,9 +113,9 @@ public class VectorIndexHandlerTest {
       throws Exception {
     File indexDir = new File(FileUtils.getTempDirectory(), 
"vector-index-handler-test-" + System.nanoTime());
     FileUtils.deleteQuietly(indexDir);
-    Assert.assertTrue(indexDir.mkdirs());
+    assertTrue(indexDir.mkdirs());
     File v3Dir = new File(indexDir, 
SegmentDirectoryPaths.V3_SUBDIRECTORY_NAME);
-    Assert.assertTrue(v3Dir.mkdir());
+    assertTrue(v3Dir.mkdir());
     FileUtils.touch(new File(v3Dir, COLUMN + suffix));
     return indexDir;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 86699135256..a6bb7da1103 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -163,10 +163,10 @@ public class V1Constants {
       public static final String BITS_PER_ELEMENT = "bitsPerElement";
       // Optional, exist for MV columns
       // TODO: Changed to optional on reader side in release 1.6.0. Change 
writer side after 1.6.0 release.
-      public static final String MAX_MULTI_VALUE_ELEMENTS = 
"maxNumberOfMultiValues";
+      public static final String TOTAL_NUMBER_OF_ENTRIES = 
"totalNumberOfEntries";
       // Optional, exist for MV columns
       // TODO: Changed to optional on reader side in release 1.6.0. Change 
writer side after 1.6.0 release.
-      public static final String TOTAL_NUMBER_OF_ENTRIES = 
"totalNumberOfEntries";
+      public static final String MAX_MULTI_VALUE_ELEMENTS = 
"maxNumberOfMultiValues";
       // Optional, exist for MV variable-length types
       // NOTE: Added in release 1.6.0. Only exist in segment created after 
1.6.0 release.
       public static final String MAX_ROW_LENGTH_IN_BYTES = 
"maxRowLengthInBytes";
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index bd2bb43ee6e..7fa13f93ab9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -102,9 +102,9 @@ public class ColumnMetadataImpl implements ColumnMetadata {
     _lengthOfLongestElement = lengthOfLongestElement;
     _isAscii = isAscii;
     _bitsPerElement = bitsPerElement;
+    _totalNumberOfEntries = totalNumberOfEntries;
     _maxNumberOfMultiValues = maxNumberOfMultiValues;
     _maxRowLengthInBytes = maxRowLengthInBytes;
-    _totalNumberOfEntries = totalNumberOfEntries;
     _partitionFunction = partitionFunction;
     _partitions = partitions;
     _autoGenerated = autoGenerated;
@@ -178,13 +178,13 @@ public class ColumnMetadataImpl implements ColumnMetadata 
{
   }
 
   @Override
-  public int getMaxNumberOfMultiValues() {
-    return _maxNumberOfMultiValues;
+  public int getTotalNumberOfEntries() {
+    return _totalNumberOfEntries;
   }
 
   @Override
-  public int getTotalNumberOfEntries() {
-    return _totalNumberOfEntries;
+  public int getMaxNumberOfMultiValues() {
+    return _maxNumberOfMultiValues;
   }
 
   @Override
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/SegmentMetadataUtils.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/SegmentMetadataUtils.java
index 22e373efd18..bc50aa05a5c 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/SegmentMetadataUtils.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/SegmentMetadataUtils.java
@@ -54,13 +54,21 @@ public class SegmentMetadataUtils {
     CommonsConfigurationUtils.saveToFile(propertiesConfiguration, 
metadataFile);
   }
 
+  /// Writes the given key→value updates to the segment's 
`metadata.properties` and reloads metadata. A `null`
+  /// value removes the property instead of writing it, so callers can drop a 
key by mapping it to `null`.
   public static SegmentMetadata updateMetadataProperties(SegmentDirectory 
segmentDirectory,
       Map<String, String> metadataProperties)
       throws Exception {
     SegmentMetadata segmentMetadata = segmentDirectory.getSegmentMetadata();
     PropertiesConfiguration propertiesConfiguration = 
SegmentMetadataUtils.getPropertiesConfiguration(segmentMetadata);
     for (Map.Entry<String, String> entry : metadataProperties.entrySet()) {
-      propertiesConfiguration.setProperty(entry.getKey(), entry.getValue());
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (value != null) {
+        propertiesConfiguration.setProperty(key, value);
+      } else {
+        propertiesConfiguration.clearProperty(key);
+      }
     }
     savePropertiesConfiguration(propertiesConfiguration, 
segmentMetadata.getIndexDir());
     // TODO: Revisit if we can save the overhead of reloading the metadata 
when invoked from ForwardIndexHandler


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to