This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c823f73af59 Optimise index stats collector for no dict (#16845)
c823f73af59 is described below

commit c823f73af5925bf731d187b66bb8797bd9b9f23f
Author: Krishan Goyal <[email protected]>
AuthorDate: Tue Oct 7 10:19:00 2025 +0530

    Optimise index stats collector for no dict (#16845)
    
    * Add ingestion skip filter for upsert SRT task
    
    * Optimise index stats collector for no dict columns
    
    * Test case fixes
    
    * Add nodict collector to avoid changes to each collector to optimise no 
dictionary columns
    
    * Add nodict collector to avoid changes to each collector to optimise no 
dictionary columns
    
    * Checkstyle fixes
    
    * Fix SegmentPreProcessorTest
    
    * Linter fix
    
    * Add support for native memory arrays
    
    * Add ULL to approximate cardinality in NoDictColumnStatisticsCollector
    
    * Revert "Fix SegmentPreProcessorTest"
    
    This reverts commit 1fed6a27f0f2d8894814f24035d9a9feac6a495c.
    
    * Fix tests after ULL approximation
    
    * cstyle fixes
    
    * Add tests related to no dict column
    
    * cstyle fixes
    
    * Add support for map type
    
    * cstyle fixes
    
    * 1. Add config to disable nodict column stats.
    2. Add 5% buffer for cardinality estimation and update tests
    
    * Fix test case
    
    * Optimise map support for no dict columns
    
    * Add documentation
    
    * Update bits per element when cardinality changes
    
    * Update cardinality from actual stats collector
    
    * cstyle fixes
    
    * test case fixes
    
    * Use HLL Plus plus instead of ULL as it generally returns approx 
cardinality >= actual cardinality. Disable NoDictStatsCollector by default.
    
    * cstyle fixes
    
    * Fix table config flag
    
    * Fix tests after 10% buffer
    
    * Address PR comments
    
    * Rename optimise to optimize
---
 .../resources/memory_estimation/table-config.json  |   3 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  13 +
 .../pinot/queries/CustomReloadQueriesTest.java     | 214 ++++++
 .../ForwardIndexHandlerReloadQueriesTest.java      |  12 -
 .../tests/BaseClusterIntegrationTest.java          |   5 +
 .../stats/MapColumnPreIndexStatsCollector.java     |  29 +-
 .../stats/NoDictColumnStatisticsCollector.java     | 232 ++++++
 .../stats/SegmentPreIndexStatsCollectorImpl.java   |  15 +
 .../segment/index/loader/ForwardIndexHandler.java  |   8 +
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  41 +-
 .../stats/MapColumnPreIndexStatsCollectorTest.java |  97 ++-
 .../stats/NoDictColumnStatisticsCollectorTest.java | 812 +++++++++++++++++++++
 .../SegmentPreIndexStatsCollectorImplTest.java     |  66 ++
 .../index/loader/ForwardIndexHandlerTest.java      |  55 +-
 .../index/loader/SegmentPreProcessorTest.java      |  62 +-
 .../spi/creator/ColumnIndexCreationInfo.java       |  17 +-
 .../pinot/spi/config/table/IndexingConfig.java     |  12 +
 .../spi/utils/builder/TableConfigBuilder.java      |   7 +
 18 files changed, 1630 insertions(+), 70 deletions(-)

diff --git 
a/pinot-controller/src/test/resources/memory_estimation/table-config.json 
b/pinot-controller/src/test/resources/memory_estimation/table-config.json
index b85c4e3e727..c4f1de0011f 100644
--- a/pinot-controller/src/test/resources/memory_estimation/table-config.json
+++ b/pinot-controller/src/test/resources/memory_estimation/table-config.json
@@ -36,7 +36,8 @@
       "stream.kafka.decoder.class.name": 
"com.linkedin.pinot.v2.server.LiKafkaDecoder",
       "stream.kafka.topic.name": "UserGeneratedContentGestureCountEvent",
       "streamType": "kafka"
-    }
+    },
+    "optimizeNoDictStatsCollection": false
   },
   "tableName": "testTable",
   "tableType": "REALTIME",
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 44189807c92..e87990c1dc8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -59,6 +59,7 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.intellij.lang.annotations.Language;
 
 import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
 
 
 /**
@@ -325,4 +326,16 @@ public abstract class BaseQueriesTest {
         serverPinotQuery == pinotQuery ? brokerRequest : 
CalciteSqlCompiler.convertToBrokerRequest(serverPinotQuery);
     return reduceOnDataTable(brokerRequest, serverBrokerRequest, dataTableMap);
   }
+
+  protected void validateBeforeAfterQueryResults(List<Object[]> beforeResults, 
List<Object[]> afterResults) {
+    assertEquals(beforeResults.size(), afterResults.size());
+    for (int i = 0; i < beforeResults.size(); i++) {
+      Object[] resultRow1 = beforeResults.get(i);
+      Object[] resultRow2 = afterResults.get(i);
+      assertEquals(resultRow1.length, resultRow2.length);
+      for (int j = 0; j < resultRow1.length; j++) {
+        assertEquals(resultRow1[j], resultRow2[j]);
+      }
+    }
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java
new file mode 100644
index 00000000000..adc77a70d08
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/CustomReloadQueriesTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+
+
+public class CustomReloadQueriesTest extends BaseQueriesTest {
+
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
CustomReloadQueriesTest.class.getSimpleName());
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  private TableConfig createTableConfig(List<String> noDictionaryColumns, 
List<String> invertedIndexColumns,
+      List<String> rangeIndexColumns, List<FieldConfig> fieldConfigs) {
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+        
.setNoDictionaryColumns(noDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
+        
.setRangeIndexColumns(rangeIndexColumns).setFieldConfigList(fieldConfigs)
+        .setOptimizeNoDictStatsCollection(true)
+        .build();
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @DataProvider(name = "alphabets")
+  public static Object[][] alphabets() {
+    // 2 sets of input data - sorted and unsorted
+    return new Object[][] {
+        { new String[]{"a", "b", "c", "d", "e"} },
+        { new String[]{"b", "c", "a", "e", "d"} }
+    };
+  }
+
+  /**
+   * If a columns approximate cardinality is lesser than actual cardinality 
and its bits per element also reduces
+   * because of this, then enabling dictionary for that column should result 
in updating both bits per element
+   * and cardinality. In this test, we will verify both segment metadata and 
query results
+   * @throws Exception
+   */
+  @Test(dataProvider = "alphabets")
+  public void 
testReducedBitsPerElementWithNoDictCardinalityApproximation(String[] alphabets)
+      throws Exception {
+
+    // Common variables - schema, data file, etc
+    File csvFile = new File(FileUtils.getTempDirectory(), "data.csv");
+    List<String> values = new ArrayList<>(Arrays.asList(alphabets));
+    String columnName = "column1";
+    writeCsv(csvFile, values, columnName);
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addSingleValueDimension(columnName, FieldSpec.DataType.STRING)
+        .build();
+
+    // Load segment with no dictionary column and get segment metadata
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+    fieldConfigs.add(new FieldConfig(
+        columnName, FieldConfig.EncodingType.RAW, List.of(), 
FieldConfig.CompressionCodec.SNAPPY, null));
+    TableConfig tableConfig = createTableConfig(List.of(), List.of(), 
List.of(), fieldConfigs);
+    ImmutableSegment segment = buildNewSegment(tableConfig, schema, 
csvFile.getAbsolutePath());
+    Map<String, ColumnMetadata> columnMetadataMap = 
segment.getSegmentMetadata().getColumnMetadataMap();
+
+    ColumnMetadata columnMetadata1 = columnMetadataMap.get(columnName);
+    assertFalse(columnMetadata1.hasDictionary());
+    assertNull(segment.getDictionary(columnName));
+
+    String query = "SELECT column1, count(*) FROM testTable GROUP BY column1 
ORDER BY column1";
+    BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+    List<Object[]> resultRows1 = 
brokerResponseNative.getResultTable().getRows();
+    assertEquals(resultRows1.size(), 5);
+
+    // Make column1 dictionary encoded and reload table
+    fieldConfigs = new ArrayList<>();
+    fieldConfigs.add(new FieldConfig(
+        columnName, FieldConfig.EncodingType.DICTIONARY, List.of(), 
FieldConfig.CompressionCodec.SNAPPY, null));
+    tableConfig = createTableConfig(List.of(), List.of(), List.of(), 
fieldConfigs);
+    segment = reloadSegment(tableConfig, schema);
+    columnMetadataMap = segment.getSegmentMetadata().getColumnMetadataMap();
+
+    ColumnMetadata columnMetadata2 = columnMetadataMap.get(columnName);
+    assertEquals(columnMetadata2.getCardinality(), 5); // actual cardinality
+    assertEquals(columnMetadata2.getBitsPerElement(), 3); // actual required 
bits per element
+    assertTrue(columnMetadata2.hasDictionary());
+    assertNotNull(segment.getDictionary(columnName));
+
+    brokerResponseNative = getBrokerResponse(query);
+    List<Object[]> resultRows2 = 
brokerResponseNative.getResultTable().getRows();
+    validateBeforeAfterQueryResults(resultRows1, resultRows2);
+  }
+
+  private ImmutableSegment buildNewSegment(TableConfig tableConfig, Schema 
schema, String inputFile)
+      throws Exception {
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    generatorConfig.setInputFilePath(inputFile);
+    generatorConfig.setFormat(FileFormat.CSV);
+    generatorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    generatorConfig.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(generatorConfig);
+    driver.build();
+
+    ImmutableSegment segment = ImmutableSegmentLoader.load(
+        new File(INDEX_DIR, SEGMENT_NAME), new IndexLoadingConfig(tableConfig, 
schema));
+    if (_indexSegment != null) {
+      _indexSegment.destroy();
+    }
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+    return segment;
+  }
+
+  private ImmutableSegment reloadSegment(TableConfig tableConfig, Schema 
schema)
+      throws Exception {
+    IndexLoadingConfig loadingConfig = new IndexLoadingConfig(tableConfig, 
schema);
+    File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
+    SegmentDirectory segmentDirectory = new SegmentLocalFSDirectory(indexDir, 
ReadMode.mmap);
+    try (SegmentPreProcessor preProcessor = new 
SegmentPreProcessor(segmentDirectory, loadingConfig)) {
+      preProcessor.process();
+    }
+    // Replace in-memory segment with reloaded one
+    _indexSegment.destroy();
+    ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
loadingConfig);
+    _indexSegment = segment;
+    _indexSegments = List.of(segment, segment);
+    return segment;
+  }
+
+  private static void writeCsv(File file, List<String> values, String 
columnName) throws IOException {
+    try (FileWriter writer = new FileWriter(file, false)) {
+      writer.append(columnName).append('\n');
+      for (String v : values) {
+        writer.append(v).append('\n');
+      }
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
index fc5b40d7c68..c8411e549dd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -621,18 +621,6 @@ public class ForwardIndexHandlerReloadQueriesTest extends 
BaseQueriesTest {
     validateBeforeAfterQueryResults(resultRows1, resultRows2);
   }
 
-  private void validateBeforeAfterQueryResults(List<Object[]> beforeResults, 
List<Object[]> afterResults) {
-    assertEquals(beforeResults.size(), afterResults.size());
-    for (int i = 0; i < beforeResults.size(); i++) {
-      Object[] resultRow1 = beforeResults.get(i);
-      Object[] resultRow2 = afterResults.get(i);
-      assertEquals(resultRow1.length, resultRow2.length);
-      for (int j = 0; j < resultRow1.length; j++) {
-        assertEquals(resultRow1[j], resultRow2[j]);
-      }
-    }
-  }
-
   /**
    * As a part of segmentReload, the ForwardIndexHandler will perform the 
following operations:
    *
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 300f9e4ae88..1fed8cc85cb 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -346,6 +346,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setQueryConfig(getQueryConfig())
         .setNullHandlingEnabled(getNullHandlingEnabled())
         .setSegmentPartitionConfig(getSegmentPartitionConfig())
+        .setOptimizeNoDictStatsCollection(true)
         .build();
     // @formatter:on
   }
@@ -420,6 +421,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setStreamConfigs(getStreamConfigs())
         .setNullHandlingEnabled(getNullHandlingEnabled())
         .setSegmentPartitionConfig(getSegmentPartitionConfig())
+        .setOptimizeNoDictStatsCollection(true)
         .setReplicaGroupStrategyConfig(getReplicaGroupStrategyConfig());
   }
 
@@ -443,6 +445,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
             new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+        .setOptimizeNoDictStatsCollection(true)
         .setUpsertConfig(upsertConfig).build();
   }
 
@@ -495,6 +498,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
             new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+        .setOptimizeNoDictStatsCollection(true)
         .setUpsertConfig(upsertConfig).build();
   }
 
@@ -522,6 +526,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
         .setDedupConfig(new DedupConfig())
+        .setOptimizeNoDictStatsCollection(true)
         .build();
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 92c58e929b4..99c65e5835b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -23,6 +23,9 @@ import java.util.Arrays;
 import java.util.Map;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.ComplexFieldSpec;
@@ -51,16 +54,24 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   private final Object2ObjectOpenHashMap<String, 
AbstractColumnStatisticsCollector> _keyStats =
       new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
   private final Map<String, Integer> _keyFrequencies = new 
Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
-  private String[] _sortedValues;
+  private String[] _sortedKeys;
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private boolean _sealed = false;
   private ComplexFieldSpec _colFieldSpec;
+  private boolean _createNoDictCollectorsForKeys = false;
 
   public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
     super(column, statsCollectorConfig);
     _sorted = false;
     _colFieldSpec = (ComplexFieldSpec) 
statsCollectorConfig.getFieldSpecForColumn(column);
+    Map<String, FieldIndexConfigs> indexConfigsByCol = 
FieldIndexConfigsUtil.createIndexConfigsByColName(
+        statsCollectorConfig.getTableConfig(), 
statsCollectorConfig.getSchema());
+    boolean isDictionaryEnabled = 
indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled();
+    if (!isDictionaryEnabled) {
+      _createNoDictCollectorsForKeys = 
statsCollectorConfig.getTableConfig().getIndexingConfig()
+          .isOptimizeNoDictStatsCollection();
+    }
   }
 
   public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
@@ -105,7 +116,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public String getMinValue() {
     if (_sealed) {
-      return _sortedValues[0];
+      return _sortedKeys[0];
     }
     throw new IllegalStateException("you must seal the collector first before 
asking for min value");
   }
@@ -113,7 +124,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public String getMaxValue() {
     if (_sealed) {
-      return _sortedValues[_sortedValues.length - 1];
+      return _sortedKeys[_sortedKeys.length - 1];
     }
     throw new IllegalStateException("you must seal the collector first before 
asking for max value");
   }
@@ -121,7 +132,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   @Override
   public String[] getUniqueValuesSet() {
     if (_sealed) {
-      return _sortedValues;
+      return _sortedKeys;
     }
     throw new IllegalStateException("you must seal the collector first before 
asking for unique values set");
   }
@@ -156,8 +167,8 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
           
_keyStats.get(entry.getKey()).collect(_keyStats.get(entry.getKey())._fieldSpec.getDefaultNullValue());
         }
       }
-      _sortedValues = _keyStats.keySet().toArray(new String[0]);
-      Arrays.sort(_sortedValues);
+      _sortedKeys = _keyStats.keySet().toArray(new String[0]);
+      Arrays.sort(_sortedKeys);
 
       // Iterate through every key stats collector and seal them
       for (AbstractColumnStatisticsCollector keyStatsCollector : 
_keyStats.values()) {
@@ -182,6 +193,10 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
         .addField(new DimensionFieldSpec(key, convertToDataType(type), 
false)).build();
     StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, 
keySchema, null);
 
+    if (_createNoDictCollectorsForKeys) {
+      return new NoDictColumnStatisticsCollector(key, config);
+    }
+
     switch (type) {
       case INTEGER:
         return new IntColumnPreIndexStatsCollector(key, config);
@@ -200,7 +215,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
     }
   }
 
-  static FieldSpec.DataType convertToDataType(PinotDataType ty) {
+  private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
     // TODO: I've been told that we already have a function to do this, so 
find that function and replace this
     switch (ty) {
       case BOOLEAN:
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java
new file mode 100644
index 00000000000..4962bd95db4
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollector.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Column statistics collector for no-dictionary columns that avoids storing 
unique values and thus reduces memory
+ * Behavior:
+ * - getUniqueValuesSet() throws NotImplementedException
+ * - getCardinality() returns approximate cardinality using HLL++
+ * - Doesn't handle cases where values are of different types (e.g. int and 
long). This is expected.
+ *   Individual type collectors (e.g. IntColumnPreIndexStatsCollector) also 
don't handle this case.
+ *   At this point in the Pinot process, the type consistency of a key should 
already be enforced.
+ *   So if such a case is encountered, it will be raised as an exception 
during collect()
+ * Doesn't handle MAP data type as MapColumnPreIndexStatsCollector is 
optimized for no-dictionary collection
+ */
+@SuppressWarnings({"rawtypes"})
+public class NoDictColumnStatisticsCollector extends 
AbstractColumnStatisticsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NoDictColumnStatisticsCollector.class);
+  private Comparable _minValue;
+  private Comparable _maxValue;
+  private int _minLength = Integer.MAX_VALUE;
+  private int _maxLength = -1; // default return value is -1
+  private int _maxRowLength = -1; // default return value is -1
+  private boolean _sealed = false;
+  // HLL Plus generally returns approximate cardinality >= actual cardinality 
which is desired
+  private final HyperLogLogPlus _hllPlus;
+
+  public NoDictColumnStatisticsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
+    super(column, statsCollectorConfig);
+    // Use default p and sp; can be made configurable via StatsCollectorConfig 
later if needed
+    _hllPlus = new HyperLogLogPlus(
+        CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_P,
+        CommonConstants.Helix.DEFAULT_HYPERLOGLOG_PLUS_SP);
+    LOGGER.info("Initialized NoDictColumnStatisticsCollector for column: {}", 
column);
+  }
+
+  @Override
+  public void collect(Object entry) {
+    assert !_sealed;
+    if (entry instanceof Object[]) {
+      Object[] values = (Object[]) entry;
+      int rowLength = 0;
+      for (Object value : values) {
+        if (value instanceof BigDecimal) {
+          // Pinot doesn't support multi-value BigDecimal as of now
+          throw new UnsupportedOperationException();
+        }
+        updateMinMax(value);
+        updateHllPlus(value);
+        int len = getValueLength(value);
+        _minLength = Math.min(_minLength, len);
+        _maxLength = Math.max(_maxLength, len);
+        rowLength += len;
+      }
+      _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, 
values.length);
+      _maxRowLength = Math.max(_maxRowLength, rowLength);
+      updateTotalNumberOfEntries(values);
+    } else if (entry instanceof int[] || entry instanceof long[]
+        || entry instanceof float[] || entry instanceof double[]) {
+      // Native multi-value types don't require length calculation because 
they're not variable-length
+      int length;
+      if (entry instanceof int[]) {
+        int[] values = (int[]) entry;
+        for (int value : values) {
+          updateMinMax(value);
+          updateHllPlus(value);
+        }
+        length = values.length;
+      } else if (entry instanceof long[]) {
+        long[] values = (long[]) entry;
+        for (long value : values) {
+          updateMinMax(value);
+          updateHllPlus(value);
+        }
+        length = values.length;
+      } else if (entry instanceof float[]) {
+        float[] values = (float[]) entry;
+        for (float value : values) {
+          updateMinMax(value);
+          updateHllPlus(value);
+        }
+        length = values.length;
+      } else {
+        double[] values = (double[]) entry;
+        for (double value : values) {
+          updateMinMax(value);
+          updateHllPlus(value);
+        }
+        length = values.length;
+      }
+      _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, length);
+      updateTotalNumberOfEntries(length);
+    } else {
+      Comparable value = toComparable(entry);
+      addressSorted(value);
+      updateMinMax(entry);
+      updateHllPlus(entry);
+      int len = getValueLength(entry);
+      _minLength = Math.min(_minLength, len);
+      _maxLength = Math.max(_maxLength, len);
+      if (isPartitionEnabled()) {
+        updatePartition(value.toString());
+      }
+      _maxRowLength = Math.max(_maxRowLength, len);
+      _totalNumberOfEntries++;
+    }
+  }
+
+  private void updateMinMax(Object value) {
+    Comparable comp = toComparable(value);
+    if (_minValue == null || comp.compareTo(_minValue) < 0) {
+      _minValue = comp;
+    }
+    if (_maxValue == null || comp.compareTo(_maxValue) > 0) {
+      _maxValue = comp;
+    }
+  }
+
+  private Comparable toComparable(Object value) {
+    if (value instanceof byte[]) {
+      return new ByteArray((byte[]) value);
+    }
+    if (value instanceof Comparable) {
+      return (Comparable) value;
+    }
+    throw new IllegalStateException("Unsupported value type " + 
value.getClass());
+  }
+
+  private int getValueLength(Object value) {
+    if (value instanceof byte[]) {
+      return ((byte[]) value).length;
+    }
+    if (value instanceof CharSequence) {
+      return ((CharSequence) 
value).toString().getBytes(StandardCharsets.UTF_8).length;
+    }
+    if (value instanceof BigDecimal) {
+      return BigDecimalUtils.byteSize((BigDecimal) value);
+    }
+    if (value instanceof Number) {
+      return 8; // fixed-width approximation as it's not actually required for 
numeric fields which are of fixed length
+    }
+    throw new IllegalStateException("Unsupported value type " + 
value.getClass());
+  }
+
+  @Override
+  public Object getMinValue() {
+    if (_sealed) {
+      return _minValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for min value");
+  }
+
+  @Override
+  public Object getMaxValue() {
+    if (_sealed) {
+      return _maxValue;
+    }
+    throw new IllegalStateException("you must seal the collector first before 
asking for max value");
+  }
+
+  @Override
+  public Object getUniqueValuesSet() {
+    throw new NotImplementedException("getUniqueValuesSet is not supported in 
NoDictColumnStatisticsCollector");
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _minLength == Integer.MAX_VALUE ? -1 : _minLength;
+  }
+
+  @Override
+  public int getLengthOfLargestElement() {
+    return _maxLength;
+  }
+
+  @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
+  public int getCardinality() {
+    // Get approximate distinct count estimate using HLL++
+    // Increase by 10% to increase probability of not returning lower than 
actual cardinality
+    long estimate = Math.round(_hllPlus.cardinality() * 1.1);
+    // There are cases where approximation can overshoot the actual number of 
entries.
+    // Returning a cardinality greater than total entries can break 
assumptions.
+    return estimate > getTotalNumberOfEntries() ? getTotalNumberOfEntries() : 
(int) estimate;
+  }
+
+  @Override
+  public void seal() {
+    _sealed = true;
+  }
+
+  private void updateHllPlus(Object value) {
+    if (value instanceof BigDecimal) {
+      // Canonicalize BigDecimal as string to avoid scale-related equality 
issues
+      _hllPlus.offer(((BigDecimal) value).toString());
+    } else {
+      _hllPlus.offer(value);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index 45c75e0c490..c3084057770 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -23,6 +23,9 @@ import java.util.Map;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -44,10 +47,22 @@ public class SegmentPreIndexStatsCollectorImpl implements 
SegmentPreIndexStatsCo
   @Override
   public void init() {
     _columnStatsCollectorMap = new HashMap<>();
+    Map<String, FieldIndexConfigs> indexConfigsByCol = 
FieldIndexConfigsUtil.createIndexConfigsByColName(
+        _statsCollectorConfig.getTableConfig(), 
_statsCollectorConfig.getSchema());
 
     Schema dataSchema = _statsCollectorConfig.getSchema();
     for (FieldSpec fieldSpec : dataSchema.getAllFieldSpecs()) {
       String column = fieldSpec.getName();
+      boolean dictionaryEnabled = 
indexConfigsByCol.get(column).getConfig(StandardIndexes.dictionary()).isEnabled();
+      if (!dictionaryEnabled) {
+        // MAP collector is optimised for no-dictionary collection
+        if 
(!fieldSpec.getDataType().getStoredType().equals(FieldSpec.DataType.MAP)) {
+          if 
(_statsCollectorConfig.getTableConfig().getIndexingConfig().isOptimizeNoDictStatsCollection())
 {
+            _columnStatsCollectorMap.put(column, new 
NoDictColumnStatisticsCollector(column, _statsCollectorConfig));
+            continue;
+          }
+        }
+      }
       switch (fieldSpec.getDataType().getStoredType()) {
         case INT:
           _columnStatsCollectorMap.put(column, new 
IntColumnPreIndexStatsCollector(column, _statsCollectorConfig));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index a45e0f35947..29ecc13c55b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -42,6 +42,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreI
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -1026,6 +1027,13 @@ public class ForwardIndexHandler extends 
BaseIndexHandler {
 
   private AbstractColumnStatisticsCollector getStatsCollector(String column, 
DataType storedType) {
     StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
+    boolean dictionaryEnabled = hasIndex(column, StandardIndexes.dictionary());
+    // MAP collector is optimised for no-dictionary collection
+    if (!dictionaryEnabled && storedType != DataType.MAP) {
+      if (_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection()) {
+        return new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
+      }
+    }
     switch (storedType) {
       case INT:
         return new IntColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 45bb854516b..2b4186ce5ea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -41,11 +41,13 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsorte
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
@@ -668,6 +670,10 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
           fieldIndexConfigs != null ? 
fieldIndexConfigs.getConfig(StandardIndexes.dictionary())
               : DictionaryIndexConfig.DEFAULT;
       boolean createDictionary = dictionaryIndexConfig.isEnabled();
+      boolean useNoDictColumnStatsCollector = false;
+      if (!dictionaryIndexConfig.isEnabled()) {
+        useNoDictColumnStatsCollector = 
_tableConfig.getIndexingConfig().isOptimizeNoDictStatsCollection();
+      }
       StatsCollectorConfig statsCollectorConfig = new 
StatsCollectorConfig(_tableConfig, _schema, null);
       ColumnIndexCreationInfo indexCreationInfo;
       boolean isSingleValue = fieldSpec.isSingleValueField();
@@ -677,8 +683,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getIntOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (Integer) fieldSpec.getDefaultNullValue(), createDictionary);
           }
-          IntColumnPreIndexStatsCollector statsCollector =
-              new IntColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new IntColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -693,8 +700,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getLongOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (Long) fieldSpec.getDefaultNullValue(), createDictionary);
           }
-          LongColumnPreIndexStatsCollector statsCollector =
-              new LongColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new LongColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -709,8 +717,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getFloatOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (Float) fieldSpec.getDefaultNullValue(), createDictionary);
           }
-          FloatColumnPreIndexStatsCollector statsCollector =
-              new FloatColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new FloatColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -725,8 +734,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getDoubleOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (Double) fieldSpec.getDefaultNullValue(), createDictionary);
           }
-          DoubleColumnPreIndexStatsCollector statsCollector =
-              new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -747,8 +757,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
               outputValues[i] = outputValueType.toBigDecimal(outputValues[i]);
             }
           }
-          DoubleColumnPreIndexStatsCollector statsCollector =
-              new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new DoubleColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -763,8 +774,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getStringOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (String) fieldSpec.getDefaultNullValue());
           }
-          StringColumnPreIndexStatsCollector statsCollector =
-              new StringColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new StringColumnPreIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
@@ -778,8 +790,9 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
             outputValues[i] = getBytesOutputValue(outputValues[i], 
isSingleValue, outputValueType,
                 (byte[]) fieldSpec.getDefaultNullValue());
           }
-          BytesColumnPredIndexStatsCollector statsCollector =
-              new BytesColumnPredIndexStatsCollector(column, 
statsCollectorConfig);
+          AbstractColumnStatisticsCollector statsCollector = 
!useNoDictColumnStatsCollector
+              ? new BytesColumnPredIndexStatsCollector(column, 
statsCollectorConfig)
+              : new NoDictColumnStatisticsCollector(column, 
statsCollectorConfig);
           for (Object value : outputValues) {
             statsCollector.collect(value);
           }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
index f7751eeb20e..055feea23d1 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollectorTest.java
@@ -19,9 +19,14 @@
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
 import java.math.BigDecimal;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
@@ -35,9 +40,13 @@ import static org.testng.Assert.*;
 
 public class MapColumnPreIndexStatsCollectorTest {
 
-  private static StatsCollectorConfig newConfig() {
+  private static StatsCollectorConfig newConfig(boolean 
optimiseNoDictStatsCollection) {
     TableConfig tableConfig = new 
TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE)
         .setTableName("testTable")
+        .setOptimizeNoDictStatsCollection(optimiseNoDictStatsCollection)
+        .setSegmentPartitionConfig(new SegmentPartitionConfig(
+            Collections.singletonMap("col", new 
ColumnPartitionConfig("murmur", 4))))
+        .setNoDictionaryColumns(java.util.List.of("col"))
         .build();
 
     Map<String, FieldSpec> children = new HashMap<>();
@@ -76,7 +85,7 @@ public class MapColumnPreIndexStatsCollectorTest {
     r3.put("kFloat", 3.5f);
     r3.put("kBigDec", new BigDecimal("5.25"));
 
-    StatsCollectorConfig statsCollectorConfig = newConfig();
+    StatsCollectorConfig statsCollectorConfig = newConfig(false);
 
     MapColumnPreIndexStatsCollector mapCollector = new 
MapColumnPreIndexStatsCollector("col", statsCollectorConfig);
 
@@ -155,4 +164,88 @@ public class MapColumnPreIndexStatsCollectorTest {
     assertFalse(keyBigDecStats.isSorted());
     assertTrue(keyBigDecStats instanceof 
BigDecimalColumnPreIndexStatsCollector);
   }
+
+  @Test
+  public void testKeyCollectorsUseNoDictWhenEnabledAndMatchOutputs() {
+    // Prepare mixed-type values across keys
+    Map<String, Object> r1 = new HashMap<>();
+    r1.put("kStr", "alpha");
+    r1.put("kInt", 3);
+    r1.put("kLong", 7L);
+    r1.put("kFloat", 1.5f);
+    r1.put("kDouble", 2.25d);
+    r1.put("kBigDec", new BigDecimal("10.01"));
+
+    Map<String, Object> r2 = new HashMap<>();
+    r2.put("kStr", "beta");
+    r2.put("kInt", 3); // duplicate for cardinality checks
+    r2.put("kLong", 2L);
+    r2.put("kFloat", 1.5f); // duplicate for cardinality checks
+    r2.put("kDouble", 0.75d);
+    r2.put("kBigDec", new BigDecimal("10.01")); // duplicate for cardinality 
checks
+
+    Map<String, Object> r3 = new HashMap<>();
+    r3.put("kStr", "alpha");
+    r3.put("kInt", 3);
+    r3.put("kFloat", 3.5f);
+    r3.put("kBigDec", new BigDecimal("5.25"));
+
+    StatsCollectorConfig cfgNoDict = newConfig(true);
+    StatsCollectorConfig cfgDict = newConfig(false);
+
+    MapColumnPreIndexStatsCollector mapNoDict = new 
MapColumnPreIndexStatsCollector("col", cfgNoDict);
+    MapColumnPreIndexStatsCollector mapDict = new 
MapColumnPreIndexStatsCollector("col", cfgDict);
+
+    mapNoDict.collect(r1);
+    mapNoDict.collect(r2);
+    mapNoDict.collect(r3);
+    mapNoDict.seal();
+
+    mapDict.collect(r1);
+    mapDict.collect(r2);
+    mapDict.collect(r3);
+    mapDict.seal();
+
+    // Compare public outputs on the map collectors
+    assertEquals(mapNoDict.getCardinality(), mapDict.getCardinality());
+    assertEquals(mapNoDict.getMinValue(), mapDict.getMinValue());
+    assertEquals(mapNoDict.getMaxValue(), mapDict.getMaxValue());
+    assertEquals(mapNoDict.getTotalNumberOfEntries(), 
mapDict.getTotalNumberOfEntries());
+    assertEquals(mapNoDict.getMaxNumberOfMultiValues(), 
mapDict.getMaxNumberOfMultiValues());
+    assertEquals(mapNoDict.isSorted(), mapDict.isSorted());
+    assertEquals(mapNoDict.getLengthOfShortestElement(), 
mapDict.getLengthOfShortestElement());
+    assertEquals(mapNoDict.getLengthOfLargestElement(), 
mapDict.getLengthOfLargestElement());
+    assertEquals(mapNoDict.getMaxRowLengthInBytes(), 
mapDict.getMaxRowLengthInBytes());
+
+    // Partition metadata
+    PartitionFunction pfNoDict = mapNoDict.getPartitionFunction();
+    PartitionFunction pfDict = mapDict.getPartitionFunction();
+    if (pfNoDict == null || pfDict == null) {
+      assertNull(pfNoDict);
+      assertNull(pfDict);
+    } else {
+      assertEquals(pfNoDict.getName(), pfDict.getName());
+      assertEquals(mapNoDict.getNumPartitions(), mapDict.getNumPartitions());
+      Set<Integer> partsNoDict = mapNoDict.getPartitions();
+      Set<Integer> partsDict = mapDict.getPartitions();
+      assertEquals(partsNoDict, partsDict);
+    }
+
+    // Compare per-key collectors exposed via getKeyStatistics
+    for (String key : mapNoDict.getAllKeyFrequencies().keySet()) {
+      AbstractColumnStatisticsCollector keyNoDict = 
mapNoDict.getKeyStatistics(key);
+      AbstractColumnStatisticsCollector keyDict = 
mapDict.getKeyStatistics(key);
+      assertNotNull(keyNoDict, "missing key in no-dict collector: " + key);
+      assertNotNull(keyDict, "missing key in dict collector: " + key);
+
+      assertEquals(keyNoDict.getCardinality(), keyDict.getCardinality(), 
"cardinality mismatch for key " + key);
+      assertEquals(keyNoDict.getMinValue(), keyDict.getMinValue(), "min 
mismatch for key " + key);
+      assertEquals(keyNoDict.getMaxValue(), keyDict.getMaxValue(), "max 
mismatch for key " + key);
+      assertEquals(keyNoDict.getTotalNumberOfEntries(), 
keyDict.getTotalNumberOfEntries(),
+          "entries mismatch for key " + key);
+      assertEquals(keyNoDict.getMaxNumberOfMultiValues(), 
keyDict.getMaxNumberOfMultiValues(),
+          "max MV mismatch for key " + key);
+      assertEquals(keyNoDict.isSorted(), keyDict.isSorted(), "sorted mismatch 
for key " + key);
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java
new file mode 100644
index 00000000000..3600f2f0562
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/NoDictColumnStatisticsCollectorTest.java
@@ -0,0 +1,812 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class NoDictColumnStatisticsCollectorTest {
+
+  private static StatsCollectorConfig newConfig(FieldSpec.DataType dataType, 
boolean isSingleValue) {
+    TableConfig tableConfig = new 
TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE)
+        .setTableName("testTable")
+        .setSegmentPartitionConfig(new SegmentPartitionConfig(
+            Collections.singletonMap("col", new 
ColumnPartitionConfig("murmur", 4))))
+        .build();
+    Schema schema = new Schema();
+    schema.addField(new DimensionFieldSpec("col", dataType, isSingleValue));
+
+    return new StatsCollectorConfig(tableConfig, schema, 
tableConfig.getIndexingConfig().getSegmentPartitionConfig());
+  }
+
+  @DataProvider(name = "primitiveTypeTestData")
+  public Object[][] primitiveTypeTestData() {
+    return new Object[][] {
+        // Ensure data has exactly 1 duplicate entry and total 4 entries
+
+        // Sorted data
+        {FieldSpec.DataType.INT, new Object[]{5, 5, 10, 20}, true},
+        {FieldSpec.DataType.LONG, new Object[]{1L, 1L, 15L, 25L}, true},
+        {FieldSpec.DataType.FLOAT, new Object[]{1.5f, 1.5f, 3.5f, 7.5f}, true},
+        {FieldSpec.DataType.DOUBLE, new Object[]{2.5, 2.5, 4.5, 8.5}, true},
+
+        // Unsorted data
+        {FieldSpec.DataType.INT, new Object[]{10, 5, 20, 5}, false},
+        {FieldSpec.DataType.LONG, new Object[]{15L, 1L, 25L, 1L}, false},
+        {FieldSpec.DataType.FLOAT, new Object[]{3.5f, 1.5f, 7.5f, 1.5f}, 
false},
+        {FieldSpec.DataType.DOUBLE, new Object[]{4.5, 2.5, 8.5, 2.5}, false}
+    };
+  }
+
+  @DataProvider(name = "stringTypeTestData")
+  public Object[][] stringTypeTestData() {
+    return new Object[][] {
+        // Ensure data has exactly 1 duplicate entry and total 4 entries
+        // Sorted data
+        {new String[]{"a", "a", "bbb", "ccc"}, true},
+        // Unsorted data
+        {new String[]{"bbb", "a", "ccc", "a"}, false}
+    };
+  }
+
+  @DataProvider(name = "bytesTypeTestData")
+  public Object[][] bytesTypeTestData() {
+    return new Object[][] {
+        // Ensure data has exactly 1 duplicate entry and total 4 entries
+        // Sorted data
+        {new byte[][]{new byte[]{1}, new byte[]{1}, new byte[]{2}, new 
byte[]{3}}, true},
+        // Unsorted data
+        {new byte[][]{new byte[]{2}, new byte[]{1}, new byte[]{1}, new 
byte[]{3}}, false}
+    };
+  }
+
+  @DataProvider(name = "bigDecimalTypeTestData")
+  public Object[][] bigDecimalTypeTestData() {
+    return new Object[][] {
+        // Ensure data has exactly 1 duplicate entry and total 4 entries
+        // Sorted data
+        {new BigDecimal[]{
+            new BigDecimal("1.23"), new BigDecimal("1.23"), new 
BigDecimal("2.34"), new BigDecimal("9.99")}, true},
+        // Unsorted data
+        {new BigDecimal[]{
+            new BigDecimal("2.34"), new BigDecimal("1.23"), new 
BigDecimal("9.99"), new BigDecimal("1.23")}, false}
+    };
+  }
+
+  @Test(dataProvider = "primitiveTypeTestData")
+  public void testSVPrimitiveTypes(FieldSpec.DataType dataType, Object[] 
entries, boolean isSorted) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(dataType, true));
+    for (Object entry : entries) {
+      c.collect(entry);
+    }
+    c.seal();
+
+    AbstractColumnStatisticsCollector expectedStatsCollector = null;
+    switch (dataType) {
+      case INT:
+        expectedStatsCollector = new IntColumnPreIndexStatsCollector("col", 
newConfig(dataType, true));
+        break;
+      case LONG:
+        expectedStatsCollector = new LongColumnPreIndexStatsCollector("col", 
newConfig(dataType, true));
+        break;
+      case FLOAT:
+        expectedStatsCollector = new FloatColumnPreIndexStatsCollector("col", 
newConfig(dataType, true));
+        break;
+      case DOUBLE:
+        expectedStatsCollector = new DoubleColumnPreIndexStatsCollector("col", 
newConfig(dataType, true));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
+    }
+    for (Object entry : entries) {
+      expectedStatsCollector.collect(entry);
+    }
+    expectedStatsCollector.seal();
+
+    assertEquals(c.getCardinality(), 3);
+    assertEquals(c.getMinValue(), expectedStatsCollector.getMinValue());
+    assertEquals(c.getMaxValue(), expectedStatsCollector.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), 4);
+    assertEquals(c.getMaxNumberOfMultiValues(), 0);
+    assertEquals(c.isSorted(), isSorted);
+    assertEquals(c.getLengthOfShortestElement(), 8);
+    assertEquals(c.getLengthOfLargestElement(), 8);
+    assertEquals(c.getMaxRowLengthInBytes(), 8);
+    assertEquals(c.getPartitions(), expectedStatsCollector.getPartitions());
+  }
+
+  @Test(dataProvider = "stringTypeTestData")
+  public void testSVString(String[] entries, boolean isSorted) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.STRING, true));
+    for (String e : entries) {
+      c.collect(e);
+    }
+    c.seal();
+
+    StringColumnPreIndexStatsCollector stringStats = new 
StringColumnPreIndexStatsCollector("col",
+        newConfig(FieldSpec.DataType.STRING, true));
+    for (String e : entries) {
+      stringStats.collect(e);
+    }
+    stringStats.seal();
+
+    assertEquals(c.getCardinality(), 3);
+    assertEquals(c.getMinValue(), "a");
+    assertEquals(c.getMaxValue(), "ccc");
+    assertEquals(c.getTotalNumberOfEntries(), entries.length);
+    assertEquals(c.getMaxNumberOfMultiValues(), 0);
+    assertEquals(c.isSorted(), isSorted);
+    assertEquals(c.getLengthOfShortestElement(), 1);
+    assertEquals(c.getLengthOfLargestElement(), 3);
+    assertEquals(c.getMaxRowLengthInBytes(), 3);
+    assertEquals(c.getPartitions(), stringStats.getPartitions());
+  }
+
+  @Test(dataProvider = "bytesTypeTestData")
+  public void testSVBytes(byte[][] entries, boolean isSorted) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.BYTES, true));
+    for (byte[] e : entries) {
+      c.collect(e);
+    }
+    c.seal();
+
+    BytesColumnPredIndexStatsCollector bytesStats = new 
BytesColumnPredIndexStatsCollector("col",
+        newConfig(FieldSpec.DataType.BYTES, true));
+    for (byte[] e : entries) {
+      bytesStats.collect(e);
+    }
+    bytesStats.seal();
+
+    assertEquals(c.getCardinality(), bytesStats.getCardinality());
+    assertEquals(c.getMinValue(), bytesStats.getMinValue());
+    assertEquals(c.getMaxValue(), bytesStats.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), entries.length);
+    assertEquals(c.getMaxNumberOfMultiValues(), 0);
+    assertEquals(c.isSorted(), isSorted);
+    assertEquals(c.getLengthOfShortestElement(), 
bytesStats.getLengthOfShortestElement());
+    assertEquals(c.getLengthOfLargestElement(), 
bytesStats.getLengthOfLargestElement());
+    assertEquals(c.getMaxRowLengthInBytes(), 
bytesStats.getMaxRowLengthInBytes());
+    assertEquals(c.getPartitions(), bytesStats.getPartitions());
+  }
+
+  @Test(dataProvider = "bigDecimalTypeTestData")
+  public void testSVBigDecimal(BigDecimal[] entries, boolean isSorted) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.BIG_DECIMAL, true));
+    for (BigDecimal e : entries) {
+      c.collect(e);
+    }
+    c.seal();
+
+    BigDecimalColumnPreIndexStatsCollector bigDecimalStats = new 
BigDecimalColumnPreIndexStatsCollector("col",
+        newConfig(FieldSpec.DataType.BIG_DECIMAL, true));
+    for (BigDecimal e : entries) {
+      bigDecimalStats.collect(e);
+    }
+    bigDecimalStats.seal();
+
+    assertEquals(c.getCardinality(), 3);
+    assertEquals(c.getMinValue(), bigDecimalStats.getMinValue());
+    assertEquals(c.getMaxValue(), bigDecimalStats.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), entries.length);
+    assertEquals(c.getMaxNumberOfMultiValues(), 0);
+    assertEquals(c.isSorted(), isSorted);
+    assertEquals(c.getLengthOfShortestElement(), 
bigDecimalStats.getLengthOfShortestElement());
+    assertEquals(c.getLengthOfLargestElement(), 
bigDecimalStats.getLengthOfLargestElement());
+    assertEquals(c.getMaxRowLengthInBytes(), 
bigDecimalStats.getMaxRowLengthInBytes());
+    assertEquals(c.getPartitions(), bigDecimalStats.getPartitions());
+  }
+
+  @DataProvider(name = "primitiveMVTypeTestData")
+  public Object[][] primitiveMVTypeTestData() {
+    return new Object[][] {
+        // Two MV rows with one duplicate across total 4 values -> cardinality 
3
+        {FieldSpec.DataType.INT, new int[][]{new int[]{5, 10}, new int[]{5, 
20}}},
+        {FieldSpec.DataType.LONG, new long[][]{new long[]{1L, 15L}, new 
long[]{1L, 25L}}},
+        {FieldSpec.DataType.FLOAT, new float[][]{new float[]{1.5f, 3.5f}, new 
float[]{1.5f, 7.5f}}},
+        {FieldSpec.DataType.DOUBLE, new double[][]{new double[]{2.5, 4.5}, new 
double[]{2.5, 8.5}}}
+    };
+  }
+
+  @Test(dataProvider = "primitiveMVTypeTestData")
+  public void testMVPrimitiveTypes(FieldSpec.DataType dataType, Object 
entries) {
+    // Validate MV behavior for numeric primitives using native arrays
+    // - isSorted should be false for MV columns
+    // - lengths are not tracked for native MV primitives (expect -1)
+    // - maxNumberOfMultiValues should reflect per-row MV length
+    NoDictColumnStatisticsCollector c;
+    AbstractColumnStatisticsCollector expectedStatsCollector;
+
+    c = new NoDictColumnStatisticsCollector("col", newConfig(dataType, false));
+
+    if (entries instanceof int[][]) {
+      expectedStatsCollector = new IntColumnPreIndexStatsCollector("col", 
newConfig(dataType, false));
+      for (int[] row : (int[][]) entries) {
+        c.collect(row);
+        expectedStatsCollector.collect(row);
+      }
+    } else if (entries instanceof long[][]) {
+      expectedStatsCollector = new LongColumnPreIndexStatsCollector("col", 
newConfig(dataType, false));
+      for (long[] row : (long[][]) entries) {
+        c.collect(row);
+        expectedStatsCollector.collect(row);
+      }
+    } else if (entries instanceof float[][]) {
+      expectedStatsCollector = new FloatColumnPreIndexStatsCollector("col", 
newConfig(dataType, false));
+      for (float[] row : (float[][]) entries) {
+        c.collect(row);
+        expectedStatsCollector.collect(row);
+      }
+    } else {
+      expectedStatsCollector = new DoubleColumnPreIndexStatsCollector("col", 
newConfig(dataType, false));
+      for (double[] row : (double[][]) entries) {
+        c.collect(row);
+        expectedStatsCollector.collect(row);
+      }
+    }
+
+    c.seal();
+    expectedStatsCollector.seal();
+
+    assertEquals(c.getCardinality(), expectedStatsCollector.getCardinality());
+    assertEquals(c.getMinValue(), expectedStatsCollector.getMinValue());
+    assertEquals(c.getMaxValue(), expectedStatsCollector.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), 4);
+    assertEquals(c.getMaxNumberOfMultiValues(), 2);
+    assertFalse(c.isSorted());
+    assertEquals(c.getLengthOfShortestElement(), 
expectedStatsCollector.getLengthOfShortestElement());
+    assertEquals(c.getLengthOfLargestElement(), 
expectedStatsCollector.getLengthOfLargestElement());
+    assertEquals(c.getMaxRowLengthInBytes(), 
expectedStatsCollector.getMaxRowLengthInBytes());
+    assertEquals(c.getPartitions(), expectedStatsCollector.getPartitions());
+  }
+
+  @DataProvider(name = "stringMVTypeTestData")
+  public Object[][] stringMVTypeTestData() {
+    return new Object[][] {
+        // Two MV rows with one duplicate across total 4 values -> cardinality 
3
+        {new String[][]{new String[]{"a", "bbb"}, new String[]{"a", "ccc"}}}
+    };
+  }
+
+  @Test(dataProvider = "stringMVTypeTestData")
+  public void testMVString(String[][] rows) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.STRING, false));
+    for (String[] r : rows) {
+      c.collect(r);
+    }
+    c.seal();
+
+    StringColumnPreIndexStatsCollector stringStats = new 
StringColumnPreIndexStatsCollector("col",
+        newConfig(FieldSpec.DataType.STRING, false));
+    for (String[] r : rows) {
+      stringStats.collect(r);
+    }
+    stringStats.seal();
+
+    assertEquals(c.getCardinality(), stringStats.getCardinality());
+    assertEquals(c.getMinValue(), stringStats.getMinValue());
+    assertEquals(c.getMaxValue(), stringStats.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), 4);
+    assertEquals(c.getMaxNumberOfMultiValues(), 2);
+    assertFalse(c.isSorted());
+    assertEquals(c.getLengthOfShortestElement(), 1);
+    assertEquals(c.getLengthOfLargestElement(), 3);
+    assertEquals(c.getMaxRowLengthInBytes(), 4);
+    assertEquals(c.getPartitions(), stringStats.getPartitions());
+  }
+
+  @DataProvider(name = "bytesMVTypeTestData")
+  public Object[][] bytesMVTypeTestData() {
+    return new Object[][] {
+        // Two MV rows with one duplicate across total 4 values
+        {new byte[][][]{new byte[][]{new byte[]{1}, new byte[]{2}}, new 
byte[][]{new byte[]{1}, new byte[]{3}}}}
+    };
+  }
+
+  @Test(dataProvider = "bytesMVTypeTestData")
+  public void testMVBytes(byte[][][] rows) {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.BYTES, false));
+    for (byte[][] r : rows) {
+      c.collect(r);
+    }
+    c.seal();
+
+    BytesColumnPredIndexStatsCollector bytesStats = new 
BytesColumnPredIndexStatsCollector("col",
+        newConfig(FieldSpec.DataType.BYTES, false));
+    for (byte[][] r : rows) {
+      bytesStats.collect(r);
+    }
+    bytesStats.seal();
+
+    assertEquals(c.getCardinality(), bytesStats.getCardinality());
+    assertEquals(c.getMinValue(), bytesStats.getMinValue());
+    assertEquals(c.getMaxValue(), bytesStats.getMaxValue());
+    assertEquals(c.getTotalNumberOfEntries(), 4);
+    assertEquals(c.getMaxNumberOfMultiValues(), 2);
+    assertFalse(c.isSorted());
+    assertEquals(c.getLengthOfShortestElement(), 
bytesStats.getLengthOfShortestElement());
+    assertEquals(c.getLengthOfLargestElement(), 
bytesStats.getLengthOfLargestElement());
+    assertEquals(c.getMaxRowLengthInBytes(), 
bytesStats.getMaxRowLengthInBytes());
+    assertEquals(c.getPartitions(), bytesStats.getPartitions());
+  }
+
+  @Test
+  public void testMVBigDecimal()
+      throws Exception {
+    NoDictColumnStatisticsCollector c = new 
NoDictColumnStatisticsCollector("col",
+        newConfig(FieldSpec.DataType.BIG_DECIMAL, false));
+    try {
+      c.collect(new Object[] {new BigDecimal("1.1"), new BigDecimal("2.2")});
+      fail("Expected UnsupportedOperationException");
+    } catch (UnsupportedOperationException expected) {
+      // expected: BigDecimal MV not supported by NoDict collector by design
+    }
+  }
+
+  // A test that picks a random data type, generates random data for that type 
(sorted or unsorted,
+  // single or multi value, high or low or medium cardinality), collects stats 
using NoDictColumnStatisticsCollector
+  // and the corresponding PreIndexStatsCollector, and compares the stats.
+  // Runs this setup multiple times to cover different scenarios.
+  @Test
+  public void testRandomizedDataComparison() throws Exception {
+    Random random = new Random();
+
+    // Run multiple iterations with different random scenarios
+    for (int iteration = 0; iteration < 100; iteration++) {
+      runRandomizedTest(random, iteration);
+    }
+  }
+
+  private void runRandomizedTest(Random random, int iteration) throws 
Exception {
+    // Randomly select data type (excluding BIG_DECIMAL for MV as it's 
unsupported)
+    FieldSpec.DataType[] supportedTypes = {
+        FieldSpec.DataType.INT, FieldSpec.DataType.LONG, 
FieldSpec.DataType.FLOAT,
+        FieldSpec.DataType.DOUBLE, FieldSpec.DataType.STRING, 
FieldSpec.DataType.BYTES,
+        FieldSpec.DataType.BIG_DECIMAL
+    };
+    FieldSpec.DataType dataType = 
supportedTypes[random.nextInt(supportedTypes.length)];
+
+    // Randomly choose single vs multi-value (skip MV for BIG_DECIMAL)
+    boolean isSingleValue = dataType == FieldSpec.DataType.BIG_DECIMAL || 
random.nextBoolean();
+
+    // Randomly choose cardinality level
+    CardinalityLevel cardinalityLevel = 
CardinalityLevel.values()[random.nextInt(CardinalityLevel.values().length)];
+
+    // Randomly choose if data should be sorted
+    boolean shouldBeSorted = random.nextBoolean();
+
+    try {
+      runTestForConfiguration(dataType, isSingleValue, cardinalityLevel, 
shouldBeSorted, random, iteration);
+    } catch (Exception e) {
+      throw new RuntimeException(String.format(
+          "Test failed for iteration %d: dataType=%s, isSingleValue=%s, 
cardinality=%s, sorted=%s",
+          iteration, dataType, isSingleValue, cardinalityLevel, 
shouldBeSorted), e);
+    }
+  }
+
+  private enum CardinalityLevel {
+    LOW(1, 20),      // 5-20 unique values
+    MEDIUM(50, 100), // 50-100 unique values
+    HIGH(200, 500);  // 200-500 unique values
+
+    private final int _minUnique;
+    private final int _maxUnique;
+
+    CardinalityLevel(int minUnique, int maxUnique) {
+      _minUnique = minUnique;
+      _maxUnique = maxUnique;
+    }
+
+    public int getUniqueCount(Random random) {
+      return _minUnique + random.nextInt(_maxUnique - _minUnique + 1);
+    }
+  }
+
+  private void runTestForConfiguration(FieldSpec.DataType dataType, boolean 
isSingleValue,
+      CardinalityLevel cardinalityLevel, boolean shouldBeSorted, Random 
random, int iteration) throws Exception {
+
+    int uniqueValueCount = cardinalityLevel.getUniqueCount(random);
+    int totalEntries = uniqueValueCount + random.nextInt(uniqueValueCount * 
2); // Add some duplicates
+
+    // Generate test data
+    Object[] testData = generateData(dataType, isSingleValue, 
uniqueValueCount, totalEntries, shouldBeSorted, random);
+
+    // Skip if we can't generate valid test data
+    if (testData == null) {
+      return;
+    }
+
+    // Create collectors
+    NoDictColumnStatisticsCollector noDictCollector =
+        new NoDictColumnStatisticsCollector("col", newConfig(dataType, 
isSingleValue));
+    AbstractColumnStatisticsCollector expectedCollector =
+        createDictCollector(dataType, isSingleValue);
+
+    // Collect stats from both collectors
+    for (Object entry : testData) {
+      noDictCollector.collect(entry);
+      expectedCollector.collect(entry);
+    }
+
+    noDictCollector.seal();
+    expectedCollector.seal();
+
+    // Compare all stats
+    compareCollectorStats(noDictCollector, expectedCollector, dataType, 
isSingleValue, iteration, testData);
+  }
+
+  private Object[] generateData(FieldSpec.DataType dataType, boolean 
isSingleValue,
+      int uniqueValueCount, int totalEntries, boolean shouldBeSorted, Random 
random) {
+
+    try {
+      switch (dataType) {
+        case INT:
+          return generateIntData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case LONG:
+          return generateLongData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case FLOAT:
+          return generateFloatData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case DOUBLE:
+          return generateDoubleData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case STRING:
+          return generateStringData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case BYTES:
+          return generateBytesData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        case BIG_DECIMAL:
+          return generateBigDecimalData(isSingleValue, uniqueValueCount, 
totalEntries, shouldBeSorted, random);
+        default:
+          return null;
+      }
+    } catch (Exception e) {
+      // Return null for unsupported combinations
+      return null;
+    }
+  }
+
+  private Object[] generateIntData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    Set<Integer> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(random.nextInt(10000));
+    }
+    List<Integer> values = new ArrayList<>(uniqueValues);
+
+    if (isSingleValue) {
+      return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+    } else {
+      return generateMultiValueIntData(values, totalEntries, random);
+    }
+  }
+
+  private Object[] generateLongData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    Set<Long> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(random.nextLong() % 100000L);
+    }
+    List<Long> values = new ArrayList<>(uniqueValues);
+
+    if (isSingleValue) {
+      return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+    } else {
+      return generateMultiValueLongData(values, totalEntries, random);
+    }
+  }
+
+  private Object[] generateFloatData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    Set<Float> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(random.nextFloat() * 1000);
+    }
+    List<Float> values = new ArrayList<>(uniqueValues);
+
+    if (isSingleValue) {
+      return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+    } else {
+      return generateMultiValueFloatData(values, totalEntries, random);
+    }
+  }
+
+  private Object[] generateDoubleData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    Set<Double> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(random.nextDouble() * 1000);
+    }
+    List<Double> values = new ArrayList<>(uniqueValues);
+
+    if (isSingleValue) {
+      return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+    } else {
+      return generateMultiValueDoubleData(values, totalEntries, random);
+    }
+  }
+
+  private Object[] generateStringData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    Set<String> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(generateRandomString(random));
+    }
+    List<String> values = new ArrayList<>(uniqueValues);
+
+    if (isSingleValue) {
+      return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+    } else {
+      return generateMultiValueStringData(values, totalEntries, random);
+    }
+  }
+
+  private Object[] generateBytesData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    List<byte[]> uniqueValues = new ArrayList<>();
+    Set<String> seen = new HashSet<>(); // Use string representation to avoid 
duplicate byte arrays
+    while (uniqueValues.size() < uniqueValueCount) {
+      byte[] bytes = generateRandomBytes(random);
+      String key = Arrays.toString(bytes);
+      if (!seen.contains(key)) {
+        uniqueValues.add(bytes);
+        seen.add(key);
+      }
+    }
+
+    if (isSingleValue) {
+      return generateSingleValueByteArrayData(uniqueValues, totalEntries, 
shouldBeSorted, random);
+    } else {
+      return generateMultiValueBytesData(uniqueValues, totalEntries, random);
+    }
+  }
+
+  private Object[] generateBigDecimalData(boolean isSingleValue, int 
uniqueValueCount, int totalEntries,
+      boolean shouldBeSorted, Random random) {
+    if (!isSingleValue) {
+      throw new UnsupportedOperationException("BigDecimal MV not supported");
+    }
+
+    Set<BigDecimal> uniqueValues = new HashSet<>();
+    while (uniqueValues.size() < uniqueValueCount) {
+      uniqueValues.add(new BigDecimal(random.nextDouble() * 1000).setScale(2, 
BigDecimal.ROUND_HALF_UP));
+    }
+    List<BigDecimal> values = new ArrayList<>(uniqueValues);
+
+    return generateSingleValueData(values, totalEntries, shouldBeSorted, 
random).toArray();
+  }
+
+  private <T extends Comparable<T>> List<T> generateSingleValueData(List<T> 
uniqueValues,
+      int totalEntries, boolean shouldBeSorted, Random random) {
+    List<T> result = new ArrayList<>();
+    for (int i = 0; i < totalEntries; i++) {
+      result.add(uniqueValues.get(random.nextInt(uniqueValues.size())));
+    }
+
+    if (shouldBeSorted) {
+      Collections.sort(result);
+    }
+
+    return result;
+  }
+
+  private Object[] generateSingleValueByteArrayData(List<byte[]> uniqueValues,
+      int totalEntries, boolean shouldBeSorted, Random random) {
+    List<byte[]> result = new ArrayList<>();
+    for (int i = 0; i < totalEntries; i++) {
+      result.add(uniqueValues.get(random.nextInt(uniqueValues.size())));
+    }
+
+    if (shouldBeSorted) {
+      // sort result by comparing ByteArray representation of each entry
+      result.sort((a, b) -> {
+        ByteArray a1 = new ByteArray(a);
+        ByteArray b1 = new ByteArray(b);
+        return a1.compareTo(b1);
+      });
+    }
+
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueIntData(List<Integer> values, int 
totalEntries, Random random) {
+    List<int[]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3); // 1-3 values per MV entry
+      int[] mvEntry = new int[Math.min(mvSize, totalEntries - entriesAdded)];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueLongData(List<Long> values, int 
totalEntries, Random random) {
+    List<long[]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3);
+      long[] mvEntry = new long[Math.min(mvSize, totalEntries - entriesAdded)];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueFloatData(List<Float> values, int 
totalEntries, Random random) {
+    List<float[]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3);
+      float[] mvEntry = new float[Math.min(mvSize, totalEntries - 
entriesAdded)];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueDoubleData(List<Double> values, int 
totalEntries, Random random) {
+    List<double[]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3);
+      double[] mvEntry = new double[Math.min(mvSize, totalEntries - 
entriesAdded)];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueStringData(List<String> values, int 
totalEntries, Random random) {
+    List<String[]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3);
+      String[] mvEntry = new String[Math.min(mvSize, totalEntries - 
entriesAdded)];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private Object[] generateMultiValueBytesData(List<byte[]> values, int 
totalEntries, Random random) {
+    List<byte[][]> result = new ArrayList<>();
+    int entriesAdded = 0;
+    while (entriesAdded < totalEntries) {
+      int mvSize = 1 + random.nextInt(3);
+      byte[][] mvEntry = new byte[Math.min(mvSize, totalEntries - 
entriesAdded)][];
+      for (int i = 0; i < mvEntry.length; i++) {
+        mvEntry[i] = values.get(random.nextInt(values.size()));
+      }
+      result.add(mvEntry);
+      entriesAdded += mvEntry.length;
+    }
+    return result.toArray();
+  }
+
+  private String generateRandomString(Random random) {
+    int length = 1 + random.nextInt(10); // 1-10 characters
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      sb.append((char) ('a' + random.nextInt(26)));
+    }
+    return sb.toString();
+  }
+
+  private byte[] generateRandomBytes(Random random) {
+    int length = 1 + random.nextInt(5); // 1-5 bytes
+    byte[] bytes = new byte[length];
+    random.nextBytes(bytes);
+    return bytes;
+  }
+
+  private AbstractColumnStatisticsCollector 
createDictCollector(FieldSpec.DataType dataType, boolean isSingleValue) {
+    switch (dataType) {
+      case INT:
+        return new IntColumnPreIndexStatsCollector("col", newConfig(dataType, 
isSingleValue));
+      case LONG:
+        return new LongColumnPreIndexStatsCollector("col", newConfig(dataType, 
isSingleValue));
+      case FLOAT:
+        return new FloatColumnPreIndexStatsCollector("col", 
newConfig(dataType, isSingleValue));
+      case DOUBLE:
+        return new DoubleColumnPreIndexStatsCollector("col", 
newConfig(dataType, isSingleValue));
+      case STRING:
+        return new StringColumnPreIndexStatsCollector("col", 
newConfig(dataType, isSingleValue));
+      case BYTES:
+        return new BytesColumnPredIndexStatsCollector("col", 
newConfig(dataType, isSingleValue));
+      case BIG_DECIMAL:
+        return new BigDecimalColumnPreIndexStatsCollector("col", 
newConfig(dataType, isSingleValue));
+      default:
+        throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
+    }
+  }
+
+  private void compareCollectorStats(NoDictColumnStatisticsCollector 
noDictCollector,
+      AbstractColumnStatisticsCollector expectedCollector, FieldSpec.DataType 
dataType,
+      boolean isSingleValue, int iteration, Object[] testData) {
+
+    String context = String.format("Iteration %d, DataType: %s, SingleValue: 
%s, Data: %s",
+        iteration, dataType, isSingleValue, Arrays.deepToString(testData));
+
+    assertTrue(noDictCollector.getCardinality() >= 
expectedCollector.getCardinality(),
+        "Approx Cardinality " + noDictCollector.getCardinality() + " is lower 
than actual cardinality "
+            + expectedCollector.getCardinality() + " for " + context);
+    assertEquals(noDictCollector.getMinValue(), 
expectedCollector.getMinValue(),
+        "MinValue mismatch - " + context);
+    assertEquals(noDictCollector.getMaxValue(), 
expectedCollector.getMaxValue(),
+        "MaxValue mismatch - " + context);
+    assertEquals(noDictCollector.getTotalNumberOfEntries(), 
expectedCollector.getTotalNumberOfEntries(),
+        "TotalNumberOfEntries mismatch - " + context);
+    assertEquals(noDictCollector.getMaxNumberOfMultiValues(), 
expectedCollector.getMaxNumberOfMultiValues(),
+        "MaxNumberOfMultiValues mismatch - " + context);
+    assertEquals(noDictCollector.isSorted(), expectedCollector.isSorted(),
+        "isSorted mismatch - " + context);
+    if (dataType != FieldSpec.DataType.INT && dataType != 
FieldSpec.DataType.LONG
+        && dataType != FieldSpec.DataType.FLOAT && dataType != 
FieldSpec.DataType.DOUBLE) {
+      if (dataType != FieldSpec.DataType.STRING) {
+        // StringCollector currently does not return shortest element length
+        assertEquals(noDictCollector.getLengthOfShortestElement(), 
expectedCollector.getLengthOfShortestElement(),
+            "LengthOfShortestElement mismatch - " + context);
+      }
+      assertEquals(noDictCollector.getLengthOfLargestElement(), 
expectedCollector.getLengthOfLargestElement(),
+          "LengthOfLargestElement mismatch - " + context);
+      assertEquals(noDictCollector.getMaxRowLengthInBytes(), 
expectedCollector.getMaxRowLengthInBytes(),
+          "MaxRowLengthInBytes mismatch - " + context);
+    }
+    assertEquals(noDictCollector.getPartitions(), 
expectedCollector.getPartitions(),
+        "Partitions mismatch - " + context);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java
new file mode 100644
index 00000000000..25bcdebd1a5
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImplTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class SegmentPreIndexStatsCollectorImplTest {
+  private StatsCollectorConfig newConfig(Schema schema, TableConfig 
tableConfig) {
+    return new StatsCollectorConfig(tableConfig, schema, null);
+  }
+
+  @Test
+  public void testNoDictCollector() {
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", 
FieldSpec.DataType.STRING).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE)
+        
.setTableName("t").setNoDictionaryColumns(java.util.List.of("c1")).build();
+    SegmentPreIndexStatsCollectorImpl impl = new 
SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig));
+    impl.init();
+    assertTrue(impl.getColumnProfileFor("c1") instanceof 
StringColumnPreIndexStatsCollector);
+  }
+
+  @Test
+  public void testNoDictCollectorDisabled() {
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", 
FieldSpec.DataType.STRING).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE)
+        .setTableName("t").setNoDictionaryColumns(java.util.List.of("c1"))
+        .setOptimizeNoDictStatsCollection(true).build();
+    SegmentPreIndexStatsCollectorImpl impl = new 
SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig));
+    impl.init();
+    assertTrue(impl.getColumnProfileFor("c1") instanceof 
NoDictColumnStatisticsCollector);
+  }
+
+  @Test
+  public void testDictCollector() {
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("c1", 
FieldSpec.DataType.STRING).build();
+    TableConfig tableConfig = new 
TableConfigBuilder(org.apache.pinot.spi.config.table.TableType.OFFLINE)
+        .setTableName("t").build();
+    SegmentPreIndexStatsCollectorImpl impl = new 
SegmentPreIndexStatsCollectorImpl(newConfig(schema, tableConfig));
+    impl.init();
+    assertTrue(impl.getColumnProfileFor("c1") instanceof 
StringColumnPreIndexStatsCollector);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index c7d7b733aa4..05f2e202655 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -298,6 +298,8 @@ public class ForwardIndexHandlerTest {
   //@formatter:on
 
   private static final Random RANDOM = new Random();
+  private static int _cardinalityOfSvEntries;
+  private static int _cardinalityOfMvEntries;
 
   private static final List<GenericRow> TEST_DATA;
 
@@ -326,21 +328,23 @@ public class ForwardIndexHandlerTest {
     Long[][] tempMVLongRowsForwardIndexDisabled = new 
Long[numRows][maxNumberOfMVEntries];
     byte[][][] tempMVByteRowsForwardIndexDisabled = new 
byte[numRows][maxNumberOfMVEntries][];
 
+    _cardinalityOfSvEntries = 1; // Start with 1 to account for static 
"testRow" entry
+    _cardinalityOfMvEntries = maxNumberOfMVEntries + 1; // Add 1 to account 
for static "testRow" entry
     for (int i = 0; i < numRows; i++) {
       // Adding a fixed value to check for filter queries
       if (i % 10 == 0) {
         String str = "testRow";
         tempStringRows[i] = str;
-        tempIntRows[i] = 1001;
-        tempLongRows[i] = 1001L;
+        tempIntRows[i] = numRows + 1;
+        tempLongRows[i] = (long) (numRows + 1);
         tempBytesRows[i] = str.getBytes();
-        tempBigDecimalRows[i] = BigDecimal.valueOf(1001);
+        tempBigDecimalRows[i] = BigDecimal.valueOf(numRows + 1);
 
         // Avoid creating empty arrays.
         int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1;
         for (int j = 0; j < numMVElements; j++) {
-          tempMVIntRows[i][j] = 1001;
-          tempMVLongRows[i][j] = 1001L;
+          tempMVIntRows[i][j] = numRows + 1;
+          tempMVLongRows[i][j] = (long) (numRows + 1);
           tempMVStringRows[i][j] = str;
           tempMVByteRows[i][j] = str.getBytes();
         }
@@ -351,9 +355,11 @@ public class ForwardIndexHandlerTest {
         tempLongRows[i] = (long) i;
         tempBytesRows[i] = str.getBytes();
         tempBigDecimalRows[i] = BigDecimal.valueOf(i);
+        _cardinalityOfSvEntries += 1;
 
         // Avoid creating empty arrays.
-        int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1;
+        // To test total cardinality, for atleast 1 row, have the number of MV 
entries = maxNumberOfMVEntries
+        int numMVElements = (i == 1) ? maxNumberOfMVEntries : 
(RANDOM.nextInt(maxNumberOfMVEntries) + 1);
         for (int j = 0; j < numMVElements; j++) {
           tempMVIntRows[i][j] = j;
           tempMVLongRows[i][j] = (long) j;
@@ -364,7 +370,7 @@ public class ForwardIndexHandlerTest {
 
       // Populate data for the MV columns with forward index disabled to have 
unique entries per row.
       // Avoid creating empty arrays.
-      int numMVElements = RANDOM.nextInt(maxNumberOfMVEntries) + 1;
+      int numMVElements = (i == 1) ? maxNumberOfMVEntries : 
(RANDOM.nextInt(maxNumberOfMVEntries) + 1);
       for (int j = 0; j < numMVElements; j++) {
         String str = "n" + i + j;
         tempMVIntRowsForwardIndexDisabled[i][j] = j;
@@ -570,6 +576,7 @@ public class ForwardIndexHandlerTest {
         .setNoDictionaryColumns(new ArrayList<>(noDictionaryColumns))
         .setInvertedIndexColumns(new ArrayList<>(invertedIndexColumns))
         
.setCreateInvertedIndexDuringSegmentGeneration(true).setRangeIndexColumns(new 
ArrayList<>(rangeIndexColumns))
+        .setOptimizeNoDictStatsCollection(true)
         .setFieldConfigList(new ArrayList<>(fieldConfigMap.values())).build();
   }
 
@@ -1241,7 +1248,11 @@ public class ForwardIndexHandlerTest {
       } else if (dataType == DataType.BIG_DECIMAL) {
         dictionaryElementSize = 4;
       }
-      validateMetadataProperties(column, true, dictionaryElementSize, 
metadata.getCardinality(),
+      int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column.equals(DIM_MV_PASS_THROUGH_LONG)
+          ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+      // DIM_RAW_SORTED_INTEGER has all unique values
+      expectedCardinality = column.equals(DIM_RAW_SORTED_INTEGER) ? 
TEST_DATA.size() : expectedCardinality;
+      validateMetadataProperties(column, true, dictionaryElementSize, 
expectedCardinality,
           metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(), metadata.isSingleValue(),
           metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
           metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1283,7 +1294,11 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(column1, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+    int expectedCardinality = column1.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column1.equals(DIM_MV_PASS_THROUGH_LONG)
+        ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+    // DIM_RAW_SORTED_INTEGER has all unique values
+    expectedCardinality = column1.equals(DIM_RAW_SORTED_INTEGER) ? 
TEST_DATA.size() : expectedCardinality;
+    validateMetadataProperties(column1, true, dictionaryElementSize, 
expectedCardinality, metadata.getTotalDocs(),
         dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
         metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
         metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1303,7 +1318,11 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(column2, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+    expectedCardinality = column2.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column2.equals(DIM_MV_PASS_THROUGH_LONG)
+        ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+    // DIM_RAW_SORTED_INTEGER has all unique values
+    expectedCardinality = column2.equals(DIM_RAW_SORTED_INTEGER) ? 
TEST_DATA.size() : expectedCardinality;
+    validateMetadataProperties(column2, true, dictionaryElementSize, 
expectedCardinality, metadata.getTotalDocs(),
         dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
         metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
         metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1498,7 +1517,9 @@ public class ForwardIndexHandlerTest {
       } else if (dataType == DataType.BIG_DECIMAL) {
         dictionaryElementSize = 4;
       }
-      validateMetadataProperties(column, true, dictionaryElementSize, 
metadata.getCardinality(),
+      int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column.equals(DIM_MV_PASS_THROUGH_LONG)
+          ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+      validateMetadataProperties(column, true, dictionaryElementSize, 
expectedCardinality,
           metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(), metadata.isSingleValue(),
           metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
           metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1544,7 +1565,9 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(column1, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+    int expectedCardinality = column1.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column1.equals(DIM_MV_PASS_THROUGH_LONG)
+        ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+    validateMetadataProperties(column1, true, dictionaryElementSize, 
expectedCardinality, metadata.getTotalDocs(),
         dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
         metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
         metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1562,7 +1585,9 @@ public class ForwardIndexHandlerTest {
     } else if (dataType == DataType.BIG_DECIMAL) {
       dictionaryElementSize = 4;
     }
-    validateMetadataProperties(column2, true, dictionaryElementSize, 
metadata.getCardinality(), metadata.getTotalDocs(),
+    expectedCardinality = column2.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column2.equals(DIM_MV_PASS_THROUGH_LONG)
+        ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+    validateMetadataProperties(column2, true, dictionaryElementSize, 
expectedCardinality, metadata.getTotalDocs(),
         dataType, metadata.getFieldType(), metadata.isSorted(), 
metadata.isSingleValue(),
         metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
         metadata.getMinValue(), metadata.getMaxValue(), false);
@@ -1637,7 +1662,9 @@ public class ForwardIndexHandlerTest {
       } else if (dataType == DataType.BIG_DECIMAL) {
         dictionaryElementSize = 4;
       }
-      validateMetadataProperties(column, true, dictionaryElementSize, 
metadata.getCardinality(),
+      int expectedCardinality = column.equals(DIM_MV_PASS_THROUGH_INTEGER) || 
column.equals(DIM_MV_PASS_THROUGH_LONG)
+          ? _cardinalityOfMvEntries : _cardinalityOfSvEntries;
+      validateMetadataProperties(column, true, dictionaryElementSize, 
expectedCardinality,
           metadata.getTotalDocs(), dataType, metadata.getFieldType(), 
metadata.isSorted(), metadata.isSingleValue(),
           metadata.getMaxNumberOfMultiValues(), 
metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
           metadata.getMinValue(), metadata.getMaxValue(), false);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 20e395ad375..8d9364f1035 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -291,6 +291,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
         .setRangeIndexColumns(new ArrayList<>(_rangeIndexColumns))
         .setFieldConfigList(new ArrayList<>(_fieldConfigMap.values()))
         .setNullHandlingEnabled(true)
+        .setOptimizeNoDictStatsCollection(true)
         .setIngestionConfig(_ingestionConfig)
         .build();
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
@@ -386,12 +387,16 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test
   public void testSimpleEnableDictionarySV()
       throws Exception {
+    int approxCardinality = 46934; // derived via 
NoDictColumnStatisticsCollector
+    int approxCardinalityStr = 6; // derived via 
NoDictColumnStatisticsCollector
     // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for 
all existing raw columns.
     buildV1Segment();
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, ChunkCompressionType.LZ4,
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, approxCardinalityStr, 
3, _schema, false,
+        false, false, 0, ChunkCompressionType.LZ4,
         true, 0, DataType.STRING, 100000);
-    validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW, 42242, 16, 
false, false, false, 0, true, 0,
-        ChunkCompressionType.LZ4, false, DataType.INT, 100000);
+    // since dictionary is disabled, the cardinality will be approximate 
cardinality.
+    validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW, 
approxCardinality, 16, false, false, false, 0,
+        true, 0, ChunkCompressionType.LZ4, false, DataType.INT, 100000);
 
     // Convert the segment to V3.
     convertV1SegmentToV3();
@@ -410,9 +415,11 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test
   public void testSimpleEnableDictionaryMV()
       throws Exception {
+    int approxCardinality = 20516; // derived via 
NoDictColumnStatisticsCollector
     // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for 
all existing raw columns.
     buildV1Segment();
-    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, false, false, 0,
+    // since dictionary is disabled, the cardinality will be approximate 
cardinality.
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, approxCardinality, 15, 
_schema, false, false, false, 0,
         ChunkCompressionType.LZ4, false, 13, DataType.INT, 106688);
 
     // Convert the segment to V3.
@@ -427,6 +434,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test
   public void testEnableDictAndOtherIndexesSV()
       throws Exception {
+    int approxCardinality = 46934; // derived via 
NoDictColumnStatisticsCollector
+
     // TEST 1: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted 
index and text index. Reload code path
     // will create dictionary, inverted index and text index.
     buildV3Segment();
@@ -444,11 +453,13 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
 
     // TEST 2: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that 
already has text index.
     resetIndexConfigs();
+    int approxCardinalityStr = 6; // derived via 
NoDictColumnStatisticsCollector
     _fieldConfigMap.put(EXISTING_STRING_COL_RAW,
         new FieldConfig(EXISTING_STRING_COL_RAW, FieldConfig.EncodingType.RAW, 
List.of(FieldConfig.IndexType.TEXT),
             null, null));
     buildV3Segment();
-    validateIndex(StandardIndexes.text(), EXISTING_STRING_COL_RAW, 5, 3, 
false, false, false, 0, true, 0, null, false,
+    validateIndex(StandardIndexes.text(), EXISTING_STRING_COL_RAW, 
approxCardinalityStr, 3, false, false,
+        false, 0, true, 0, null, false,
         DataType.STRING, 100000);
 
     // At this point, the segment has text index. Now, the reload path should 
create a dictionary.
@@ -465,7 +476,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     resetIndexConfigs();
     _rangeIndexColumns.add(EXISTING_INT_COL_RAW);
     buildV3Segment();
-    validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, 42242, 16, 
false, false, false, 0, true, 0,
+    // Since dictionary is disabled, the cardinality will be approximate 
cardinality.
+    validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW, 
approxCardinality, 16, false, false, false, 0, true, 0,
         ChunkCompressionType.LZ4, false, DataType.INT, 100000);
     long oldRangeIndexSize = new 
SegmentMetadataImpl(INDEX_DIR).getColumnMetadataFor(EXISTING_INT_COL_RAW)
         .getIndexSizeFor(StandardIndexes.range());
@@ -484,6 +496,8 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test
   public void testEnableDictAndOtherIndexesMV()
       throws Exception {
+    int approxCardinality = 20516; // derived via 
NoDictColumnStatisticsCollector
+
     // TEST 1: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column. 
Also enable inverted index and range index.
     buildV3Segment();
     _noDictionaryColumns.remove(EXISTING_INT_COL_RAW_MV);
@@ -500,10 +514,11 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     resetIndexConfigs();
     _rangeIndexColumns.add(EXISTING_INT_COL_RAW_MV);
     buildV3Segment();
-    validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW_MV, 18499, 
15, false, false, false, 0, false, 13,
-        ChunkCompressionType.LZ4, false, DataType.INT, 106688);
-    validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW_MV, 18499, 15, 
false, false, false, 0, false, 13,
-        ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+    // Since dictionary is disabled, the cardinality will be approximate 
cardinality.
+    validateIndex(StandardIndexes.forward(), EXISTING_INT_COL_RAW_MV, 
approxCardinality, 15, false, false, false, 0,
+        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
+    validateIndex(StandardIndexes.range(), EXISTING_INT_COL_RAW_MV, 
approxCardinality, 15, false, false, false, 0,
+        false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688);
 
     // Enable dictionary.
     _noDictionaryColumns.remove(EXISTING_INT_COL_RAW_MV);
@@ -623,27 +638,34 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test
   public void testForwardIndexHandlerChangeCompression()
       throws Exception {
+    int approximateCardinality = 20516; // derived via 
NoDictColumnStatisticsCollector
+    int approxCardinalityStr = 6; // derived via 
NoDictColumnStatisticsCollector
+
     // Test1: Rewriting forward index will be a no-op for v1 segments. Default 
LZ4 compressionType will be retained.
     buildV1Segment();
     _fieldConfigMap.put(EXISTING_STRING_COL_RAW,
         new FieldConfig(EXISTING_STRING_COL_RAW, FieldConfig.EncodingType.RAW, 
List.of(), CompressionCodec.ZSTANDARD,
             null));
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0, ChunkCompressionType.LZ4,
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, approxCardinalityStr, 
3, _schema, false,
+        false, false, 0, ChunkCompressionType.LZ4,
         true, 0, DataType.STRING, 100000);
 
     // Convert the segment to V3.
     convertV1SegmentToV3();
 
     // Test2: Now forward index will be rewritten with ZSTANDARD 
compressionType.
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0,
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, approxCardinalityStr, 
3, _schema, false,
+        false, false, 0,
         ChunkCompressionType.ZSTANDARD, true, 0, DataType.STRING, 100000);
 
     // Test3: Change compression on existing raw index column. Also add text 
index on same column. Check correctness.
     _fieldConfigMap.put(EXISTING_STRING_COL_RAW,
         new FieldConfig(EXISTING_STRING_COL_RAW, FieldConfig.EncodingType.RAW, 
List.of(FieldConfig.IndexType.TEXT),
             CompressionCodec.SNAPPY, null));
-    checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0);
-    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, 
false, false, false, 0, true, 0,
+    checkTextIndexCreation(EXISTING_STRING_COL_RAW, approxCardinalityStr, 3, 
_schema, false,
+        false, false, 0);
+    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 
approxCardinalityStr, 3, false,
+        false, false, 0, true, 0,
         ChunkCompressionType.SNAPPY, false, DataType.STRING, 100000);
 
     // Test4: Change compression on RAW index column. Change another index on 
another column. Check correctness.
@@ -658,14 +680,16 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     // Check FST index
     checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, 
_newColumnsSchemaWithFST, false, false, 26);
     // Check forward index.
-    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, 
false, false, false, 0, true, 0,
+    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 
approxCardinalityStr, 3, false,
+        false, false, 0, true, 0,
         ChunkCompressionType.ZSTANDARD, false, DataType.STRING, 100000);
 
     // Test5: Change compressionType for an MV column
     _fieldConfigMap.put(EXISTING_INT_COL_RAW_MV,
         new FieldConfig(EXISTING_INT_COL_RAW_MV, FieldConfig.EncodingType.RAW, 
List.of(), CompressionCodec.ZSTANDARD,
             null));
-    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, 
false, false, false, 0,
+    // Since dictionary is disabled, the cardinality will be approximate 
cardinality.
+    checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, approximateCardinality, 
15, _schema, false, false, false, 0,
         ChunkCompressionType.ZSTANDARD, false, 13, DataType.INT, 106688);
   }
 
@@ -700,11 +724,13 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
   @Test(dataProvider = "bothV1AndV3")
   public void testEnableTextIndexOnExistingRawColumn(SegmentVersion 
segmentVersion)
       throws Exception {
+    int approxCardinalityStr = 6; // derived via 
NoDictColumnStatisticsCollector
     buildSegment(segmentVersion);
     _fieldConfigMap.put(EXISTING_STRING_COL_RAW,
         new FieldConfig(EXISTING_STRING_COL_RAW, FieldConfig.EncodingType.RAW, 
List.of(FieldConfig.IndexType.TEXT),
             null, null));
-    checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, 
false, false, 0);
+    checkTextIndexCreation(EXISTING_STRING_COL_RAW, approxCardinalityStr, 3, 
_schema, false,
+        false, false, 0);
   }
 
   /**
@@ -1101,7 +1127,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     assertEquals(columnMetadata.getFieldSpec(), spec);
     assertTrue(columnMetadata.isAutoGenerated());
     originalColumnMetadata = segmentMetadata.getColumnMetadataFor("column3");
-    assertEquals(columnMetadata.getCardinality(), 
originalColumnMetadata.getCardinality());
+    assertEquals(columnMetadata.getCardinality(), 6); // cardinality derived 
via NoDictColumnStatisticsCollector
     assertEquals(columnMetadata.getBitsPerElement(), 
originalColumnMetadata.getBitsPerElement());
     assertEquals(columnMetadata.getTotalNumberOfEntries(), 
originalColumnMetadata.getTotalNumberOfEntries());
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
index 9e47502098e..5bd2e4db73d 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 
@@ -64,11 +65,23 @@ public class ColumnIndexCreationInfo implements 
Serializable {
   }
 
   public Object getSortedUniqueElementsArray() {
-    return _columnStatistics.getUniqueValuesSet();
+    try {
+      return _columnStatistics.getUniqueValuesSet();
+    } catch (NotImplementedException e) {
+      return null;
+    }
   }
 
   public int getDistinctValueCount() {
-    Object uniqueValArray = _columnStatistics.getUniqueValuesSet();
+    Object uniqueValArray;
+    try {
+      uniqueValArray = _columnStatistics.getUniqueValuesSet();
+    } catch (NotImplementedException e) {
+      // For no-dictionary columns, we don't retain unique values in 
collectors to save memory.
+      // Fall back to the collectors' cardinality (tracked as total entries) 
so downstream components retain
+      // a non-negative effective cardinality for optimizations like 
scan-based AND reordering.
+      return _columnStatistics.getCardinality();
+    }
     if (uniqueValArray == null) {
       return Constants.UNKNOWN_CARDINALITY;
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 85b224c205b..5a838935056 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -65,6 +65,10 @@ public class IndexingConfig extends BaseJsonConfig {
   private boolean _nullHandlingEnabled;
   private boolean _columnMajorSegmentBuilderEnabled = true;
   private boolean _skipSegmentPreprocess;
+  // If set to true, uses NoDictColumnStatisticsCollector for stats collection 
for no-dictionary columns.
+  // Once we are confident about the stability of 
NoDictColumnStatisticsCollector,
+  // we can enable it by default and deprecate this config
+  private boolean _optimizeNoDictStatsCollection = false;
 
   /**
    * If `optimizeDictionary` enabled, dictionary is not created for the 
high-cardinality
@@ -354,6 +358,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _skipSegmentPreprocess = skipSegmentPreprocess;
   }
 
+  public boolean isOptimizeNoDictStatsCollection() {
+    return _optimizeNoDictStatsCollection;
+  }
+
+  public void setOptimizeNoDictStatsCollection(boolean 
optimizeNoDictStatsCollection) {
+    _optimizeNoDictStatsCollection = optimizeNoDictStatsCollection;
+  }
+
   public boolean isOptimizeDictionary() {
     return _optimizeDictionary;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index e5388131f86..07d518fb4b7 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -107,6 +107,7 @@ public class TableConfigBuilder {
   private boolean _nullHandlingEnabled;
   private boolean _columnMajorSegmentBuilderEnabled = true;
   private boolean _skipSegmentPreprocess;
+  private boolean _optimizeNoDictStatsCollection = false;
   private List<String> _varLengthDictionaryColumns;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private List<String> _jsonIndexColumns;
@@ -381,6 +382,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setOptimizeNoDictStatsCollection(boolean 
optimizeNoDictStatsCollection) {
+    _optimizeNoDictStatsCollection = optimizeNoDictStatsCollection;
+    return this;
+  }
+
   public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
     _customConfig = customConfig;
     return this;
@@ -511,6 +517,7 @@ public class TableConfigBuilder {
     indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
     
indexingConfig.setColumnMajorSegmentBuilderEnabled(_columnMajorSegmentBuilderEnabled);
     indexingConfig.setSkipSegmentPreprocess(_skipSegmentPreprocess);
+    
indexingConfig.setOptimizeNoDictStatsCollection(_optimizeNoDictStatsCollection);
     indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
     indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
     indexingConfig.setMultiColumnTextIndexConfig(_multiColumnTextIndexConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to