krishan1390 commented on code in PR #16727:
URL: https://github.com/apache/pinot/pull/16727#discussion_r2425240734


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {

Review Comment:
   we don't ignore virtual column in row based collector and 
SegmentIndexCreationDriverImpl._segmentStats has stats for virtual columns. 
   
   why are we ignoring it here ? not sure if ignore virtual columns in 
_segmentStats will break some other assumption



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();

Review Comment:
   initializeStatsCollectors and collectColumnStats needn't be called in 
constructor right ?
   
   initializeStatsCollectors should be called in init() method ? 
   
   and similarly collectColumnStats in build() method ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -171,31 +170,72 @@ public void init(SegmentGeneratorConfig config, 
RecordReader recordReader)
         new TransformPipeline(config.getTableConfig(), config.getSchema()));
   }
 
+  /**
+   * Initialize the driver for columnar segment building using a 
ColumnReaderFactory.
+   * This method sets up the driver to use column-wise input data access 
instead of row-wise.
+   *
+   * @param config Segment generator configuration
+   * @param columnReaderFactory Factory for creating column readers
+   * @throws Exception if initialization fails
+   */
+  public void init(SegmentGeneratorConfig config, ColumnReaderFactory 
columnReaderFactory)
+      throws Exception {
+    // Initialize the column reader factory with target schema
+    columnReaderFactory.init(config.getSchema());
+
+    // Create all column readers for the target schema
+    Map<String, ColumnReader> columnReaders = 
columnReaderFactory.getAllColumnReaders();
+
+    // Create columnar data source
+    ColumnarSegmentCreationDataSource columnarDataSource =
+        new ColumnarSegmentCreationDataSource(columnReaderFactory, 
columnReaders);
+
+    // Use the existing init method with columnar data source and no transform 
pipeline
+    init(config, columnarDataSource, null);

Review Comment:
   noob question - is it OK to skip transform pipeline for all use cases where 
columnar segment build is used ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      switch (fieldSpec.getDataType().getStoredType()) {
+        case INT:
+          _columnStatsCollectorMap.put(columnName,
+              new IntColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case LONG:
+          _columnStatsCollectorMap.put(columnName,
+              new LongColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case FLOAT:
+          _columnStatsCollectorMap.put(columnName,
+              new FloatColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case DOUBLE:
+          _columnStatsCollectorMap.put(columnName,
+              new DoubleColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(columnName,
+              new BigDecimalColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case STRING:
+          _columnStatsCollectorMap.put(columnName,
+              new StringColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BYTES:
+          _columnStatsCollectorMap.put(columnName,
+              new BytesColumnPredIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case MAP:
+          _columnStatsCollectorMap.put(columnName,
+              new MapColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
+      }
+    }
+  }
+
+  /**
+   * Collect stats by iterating column-wise using the provided ColumnReader 
instances.
+   */
+  private void collectColumnStats() {
+    LOGGER.info("Collecting stats for {} columns using column-wise iteration", 
_columnReaders.size());
+
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      AbstractColumnStatisticsCollector statsCollector = 
_columnStatsCollectorMap.get(columnName);
+      ColumnReader columnReader = _columnReaders.get(columnName);
+
+      if (columnReader == null) {
+        throw new RuntimeException("Column reader for column " + columnName + 
" not found");
+      }
+
+      LOGGER.debug("Collecting stats for column: {}", columnName);
+      collectStatsFromColumnReader(columnName, columnReader, statsCollector);
+
+      // Seal the stats collector
+      statsCollector.seal();
+    }
+  }
+
+  /**
+   * Collect stats from a column reader by iterating over all values using the 
iterator pattern.
+   */
+  private void collectStatsFromColumnReader(String columnName, ColumnReader 
columnReader,
+                                           AbstractColumnStatisticsCollector 
statsCollector) {
+    try {
+      // Reset the column reader to start from the beginning
+      columnReader.rewind();
+
+      int docCount = 0;
+      while (columnReader.hasNext()) {
+        Object value = columnReader.next();
+        statsCollector.collect(value);
+        docCount++;
+      }
+
+      // if totalDocCount is unset then set total doc count from the first 
column
+      if (_totalDocCount == -1) {
+        _totalDocCount = docCount;
+      } else if (_totalDocCount != docCount) {
+        // all columns should have same count
+        LOGGER.warn("Column {} has {} documents, but expected {} documents",
+            columnName, docCount, _totalDocCount);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Failed to collect stats for column: {}", columnName, e);
+      throw new RuntimeException("Failed to collect stats for column: " + 
columnName, e);
+    }
+  }
+
+  @Override
+  public ColumnStatistics getColumnProfileFor(String column) {
+    return _columnStatsCollectorMap.get(column);
+  }
+
+  @Override
+  public int getTotalDocCount() {
+    return _totalDocCount;
+  }
+
+  @Override
+  public void init() {
+    // Already initialized in constructor
+  }
+
+  @Override
+  public void build() {
+    // Stats are already collected in constructor
+  }
+
+  @Override
+  public void logStats() {
+    LOGGER.info("Columnar segment stats collection completed for {} columns 
with {} documents",

Review Comment:
   we should log the full stats here. same comment as earlier to refactor 
common code of this class and SegmentPreIndexStatsCollectorImpl to get same 
benefits without duplicate code. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -677,4 +706,63 @@ public int getSkippedRowsFound() {
   public int getSanitizedRowsFound() {
     return _sanitizedRowsFound;
   }
+
+  /**
+   * Build segment using columnar approach.
+   * This method builds the segment by processing data column-wise instead of 
row-wise.
+   *
+   * @throws Exception if segment building fails
+   */
+  public void buildColumnar() throws Exception {
+    if (!(_dataSource instanceof ColumnarSegmentCreationDataSource)) {
+      throw new IllegalStateException("buildColumnar() can only be called 
after initColumnar()");
+    }
+
+    ColumnarSegmentCreationDataSource columnarDataSource = 
(ColumnarSegmentCreationDataSource) _dataSource;
+    Map<String, ColumnReader> columnReaders = 
columnarDataSource.getColumnReaders();
+
+    LOGGER.info("Starting columnar segment building for {} columns", 
columnReaders.size());
+
+    // Reuse existing stats collection and index creation info logic
+    LOGGER.debug("Start building StatsCollector!");
+    collectStatsAndIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    if (_totalDocs == 0) {
+      LOGGER.warn("No documents found in data source");
+      handlePostCreation();
+      return;
+    }
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir, null);

Review Comment:
   whats the consequnce of not setting immutableToMutableIdMap which is set in 
buildByColumn() too 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -171,31 +170,72 @@ public void init(SegmentGeneratorConfig config, 
RecordReader recordReader)
         new TransformPipeline(config.getTableConfig(), config.getSchema()));
   }
 
+  /**
+   * Initialize the driver for columnar segment building using a 
ColumnReaderFactory.
+   * This method sets up the driver to use column-wise input data access 
instead of row-wise.
+   *
+   * @param config Segment generator configuration
+   * @param columnReaderFactory Factory for creating column readers
+   * @throws Exception if initialization fails
+   */
+  public void init(SegmentGeneratorConfig config, ColumnReaderFactory 
columnReaderFactory)
+      throws Exception {
+    // Initialize the column reader factory with target schema
+    columnReaderFactory.init(config.getSchema());
+
+    // Create all column readers for the target schema
+    Map<String, ColumnReader> columnReaders = 
columnReaderFactory.getAllColumnReaders();
+
+    // Create columnar data source
+    ColumnarSegmentCreationDataSource columnarDataSource =
+        new ColumnarSegmentCreationDataSource(columnReaderFactory, 
columnReaders);
+
+    // Use the existing init method with columnar data source and no transform 
pipeline
+    init(config, columnarDataSource, null);
+
+    LOGGER.info("Initialized SegmentIndexCreationDriverImpl for columnar 
building with {} columns",
+        columnReaders.size());
+  }
+
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource 
dataSource,
       TransformPipeline transformPipeline)
       throws Exception {
     _config = config;
-    _recordReader = dataSource.getRecordReader();
     _dataSchema = config.getSchema();
     _continueOnError = config.isContinueOnError();
+    String readerClassName = null;
+
+    // Handle columnar data sources differently
+    if (dataSource instanceof ColumnarSegmentCreationDataSource) {
+      // For columnar data sources, we don't have a record reader
+      _recordReader = null;

Review Comment:
   lets annotate _recordReader and  _transformPipeline as Nullable 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();

Review Comment:
   we can probably abstract the common logic in 
SegmentPreIndexStatsCollectorImpl and this class to populate 
_columnStatsCollectorMap to avoid redundant code.
   
   Also SegmentPreIndexStatsCollectorImpl has added support for 
NoDictColumnStatisticsCollector which needs to be added here too - 
https://github.com/apache/pinot/blob/master/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java#L61
 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      switch (fieldSpec.getDataType().getStoredType()) {
+        case INT:
+          _columnStatsCollectorMap.put(columnName,
+              new IntColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case LONG:
+          _columnStatsCollectorMap.put(columnName,
+              new LongColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case FLOAT:
+          _columnStatsCollectorMap.put(columnName,
+              new FloatColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case DOUBLE:
+          _columnStatsCollectorMap.put(columnName,
+              new DoubleColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(columnName,
+              new BigDecimalColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case STRING:
+          _columnStatsCollectorMap.put(columnName,
+              new StringColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BYTES:
+          _columnStatsCollectorMap.put(columnName,
+              new BytesColumnPredIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case MAP:
+          _columnStatsCollectorMap.put(columnName,
+              new MapColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
+      }
+    }
+  }
+
+  /**
+   * Collect stats by iterating column-wise using the provided ColumnReader 
instances.
+   */
+  private void collectColumnStats() {
+    LOGGER.info("Collecting stats for {} columns using column-wise iteration", 
_columnReaders.size());
+
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      AbstractColumnStatisticsCollector statsCollector = 
_columnStatsCollectorMap.get(columnName);
+      ColumnReader columnReader = _columnReaders.get(columnName);
+
+      if (columnReader == null) {
+        throw new RuntimeException("Column reader for column " + columnName + 
" not found");
+      }
+
+      LOGGER.debug("Collecting stats for column: {}", columnName);
+      collectStatsFromColumnReader(columnName, columnReader, statsCollector);
+
+      // Seal the stats collector
+      statsCollector.seal();
+    }
+  }
+
+  /**
+   * Collect stats from a column reader by iterating over all values using the 
iterator pattern.
+   */
+  private void collectStatsFromColumnReader(String columnName, ColumnReader 
columnReader,
+                                           AbstractColumnStatisticsCollector 
statsCollector) {
+    try {
+      // Reset the column reader to start from the beginning
+      columnReader.rewind();
+
+      int docCount = 0;
+      while (columnReader.hasNext()) {
+        Object value = columnReader.next();
+        statsCollector.collect(value);

Review Comment:
   need to handle exceptions and continueOnError in case of an exception here. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -396,27 +372,58 @@ public void indexColumn(String columnName, @Nullable 
int[] sortedDocIds, IndexSe
       if (sortedDocIds != null) {
         int onDiskDocId = 0;
         for (int docId : sortedDocIds) {
-          // If validDodIds are provided, only index column if it's a valid doc
-          if (validDocIds == null || validDocIds.contains(docId)) {
-            indexColumnValue(colReader, creatorsByIndex, columnName, 
fieldSpec, dictionaryCreator, docId, onDiskDocId,
-                nullVec);
-            onDiskDocId++;
-          }
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec);
+          onDiskDocId++;
         }
       } else {
-        int onDiskDocId = 0;
         for (int docId = 0; docId < numDocs; docId++) {
-          // If validDodIds are provided, only index column if it's a valid doc
-          if (validDocIds == null || validDocIds.contains(docId)) {
-            indexColumnValue(colReader, creatorsByIndex, columnName, 
fieldSpec, dictionaryCreator, docId, onDiskDocId,
-                nullVec);
-            onDiskDocId++;
-          }
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, docId, nullVec);
         }
       }
     }
   }
 
+  /**
+   * Index a column using a ColumnReader (column-major approach).
+   * This method processes the column values using the iterator pattern from 
ColumnReader.
+   *
+   * @param columnName Name of the column to index
+   * @param columnReader ColumnReader for the column data
+   * @throws IOException if indexing fails
+   */
+  @Override
+  public void indexColumn(String columnName, ColumnReader columnReader) throws 
IOException {
+    Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+    NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+    SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+    Object defaultNullValue = fieldSpec.getDefaultNullValue();
+    Object reuseColumnValueToIndex;
+
+    // Reset column reader to start from beginning
+    columnReader.rewind();
+
+    int docId = 0;
+    while (columnReader.hasNext()) {
+      reuseColumnValueToIndex = columnReader.next();
+
+      // Handle null values
+      if (nullVec != null && reuseColumnValueToIndex == null) {
+        nullVec.setNull(docId);
+        reuseColumnValueToIndex = defaultNullValue;
+      }
+
+      if (fieldSpec.isSingleValueField()) {
+        indexSingleValueRow(dictionaryCreator, reuseColumnValueToIndex, 
creatorsByIndex);
+      } else {
+        indexMultiValueRow(dictionaryCreator, (Object[]) 
reuseColumnValueToIndex, creatorsByIndex);
+      }

Review Comment:
   catch JsonParseException and throw ColumnJsonParserException(columnName, 
jpe) similar to indexRow ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -677,4 +706,63 @@ public int getSkippedRowsFound() {
   public int getSanitizedRowsFound() {
     return _sanitizedRowsFound;
   }
+
+  /**
+   * Build segment using columnar approach.
+   * This method builds the segment by processing data column-wise instead of 
row-wise.
+   *
+   * @throws Exception if segment building fails
+   */
+  public void buildColumnar() throws Exception {

Review Comment:
   we need to handle continueOnError cases here too. 
   
   and in case of column readers, the added complexity is that we shouldn't 
index any column of a row if any single column results in an exception. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/ColumnarSegmentCreationDataSource.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator;
+
+import java.util.Map;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.ColumnarSegmentPreIndexStatsContainer;
+import org.apache.pinot.segment.spi.creator.SegmentCreationDataSource;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.ColumnReaderFactory;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SegmentCreationDataSource implementation that uses ColumnReaderFactory for 
columnar data access.
+ *
+ * <p>This data source enables columnar segment building by providing access 
to column readers
+ * instead of row-based record readers. It supports:
+ * <ul>
+ *   <li>Columnar statistics collection</li>
+ *   <li>Column-wise data access through ColumnReaderFactory</li>
+ *   <li>Efficient handling of new columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ */
+public class ColumnarSegmentCreationDataSource implements 
SegmentCreationDataSource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentCreationDataSource.class);
+
+  private final ColumnReaderFactory _columnReaderFactory;

Review Comment:
   I think we can remove _columnReaderFactory from this class



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      switch (fieldSpec.getDataType().getStoredType()) {
+        case INT:
+          _columnStatsCollectorMap.put(columnName,
+              new IntColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case LONG:
+          _columnStatsCollectorMap.put(columnName,
+              new LongColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case FLOAT:
+          _columnStatsCollectorMap.put(columnName,
+              new FloatColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case DOUBLE:
+          _columnStatsCollectorMap.put(columnName,
+              new DoubleColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(columnName,
+              new BigDecimalColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case STRING:
+          _columnStatsCollectorMap.put(columnName,
+              new StringColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BYTES:
+          _columnStatsCollectorMap.put(columnName,
+              new BytesColumnPredIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case MAP:
+          _columnStatsCollectorMap.put(columnName,
+              new MapColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
+      }
+    }
+  }
+
+  /**
+   * Collect stats by iterating column-wise using the provided ColumnReader 
instances.
+   */
+  private void collectColumnStats() {
+    LOGGER.info("Collecting stats for {} columns using column-wise iteration", 
_columnReaders.size());
+
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      AbstractColumnStatisticsCollector statsCollector = 
_columnStatsCollectorMap.get(columnName);
+      ColumnReader columnReader = _columnReaders.get(columnName);
+
+      if (columnReader == null) {
+        throw new RuntimeException("Column reader for column " + columnName + 
" not found");
+      }
+
+      LOGGER.debug("Collecting stats for column: {}", columnName);
+      collectStatsFromColumnReader(columnName, columnReader, statsCollector);
+
+      // Seal the stats collector
+      statsCollector.seal();
+    }
+  }
+
+  /**
+   * Collect stats from a column reader by iterating over all values using the 
iterator pattern.
+   */
+  private void collectStatsFromColumnReader(String columnName, ColumnReader 
columnReader,
+                                           AbstractColumnStatisticsCollector 
statsCollector) {
+    try {
+      // Reset the column reader to start from the beginning
+      columnReader.rewind();
+
+      int docCount = 0;
+      while (columnReader.hasNext()) {
+        Object value = columnReader.next();
+        statsCollector.collect(value);
+        docCount++;
+      }
+
+      // if totalDocCount is unset then set total doc count from the first 
column
+      if (_totalDocCount == -1) {
+        _totalDocCount = docCount;
+      } else if (_totalDocCount != docCount) {
+        // all columns should have same count
+        LOGGER.warn("Column {} has {} documents, but expected {} documents",

Review Comment:
   I guess this is probably not fine and we should throw an exception in such a 
case. otherwise how will we map the data to the right row
   
   in case it is fine, we should update _totalDocCount to max of _totalDocCount 
and docCount ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -396,27 +372,58 @@ public void indexColumn(String columnName, @Nullable 
int[] sortedDocIds, IndexSe
       if (sortedDocIds != null) {
         int onDiskDocId = 0;
         for (int docId : sortedDocIds) {
-          // If validDodIds are provided, only index column if it's a valid doc
-          if (validDocIds == null || validDocIds.contains(docId)) {
-            indexColumnValue(colReader, creatorsByIndex, columnName, 
fieldSpec, dictionaryCreator, docId, onDiskDocId,
-                nullVec);
-            onDiskDocId++;
-          }
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec);
+          onDiskDocId++;
         }
       } else {
-        int onDiskDocId = 0;
         for (int docId = 0; docId < numDocs; docId++) {
-          // If validDodIds are provided, only index column if it's a valid doc
-          if (validDocIds == null || validDocIds.contains(docId)) {
-            indexColumnValue(colReader, creatorsByIndex, columnName, 
fieldSpec, dictionaryCreator, docId, onDiskDocId,
-                nullVec);
-            onDiskDocId++;
-          }
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, docId, nullVec);
         }
       }
     }
   }
 
+  /**
+   * Index a column using a ColumnReader (column-major approach).
+   * This method processes the column values using the iterator pattern from 
ColumnReader.
+   *
+   * @param columnName Name of the column to index
+   * @param columnReader ColumnReader for the column data
+   * @throws IOException if indexing fails
+   */
+  @Override
+  public void indexColumn(String columnName, ColumnReader columnReader) throws 
IOException {
+    Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+    NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+    SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+    Object defaultNullValue = fieldSpec.getDefaultNullValue();
+    Object reuseColumnValueToIndex;
+
+    // Reset column reader to start from beginning
+    columnReader.rewind();
+
+    int docId = 0;
+    while (columnReader.hasNext()) {
+      reuseColumnValueToIndex = columnReader.next();
+
+      // Handle null values
+      if (nullVec != null && reuseColumnValueToIndex == null) {
+        nullVec.setNull(docId);
+        reuseColumnValueToIndex = defaultNullValue;

Review Comment:
   what happens if the field doesn't support null (I guess defaultNullValue is 
null or nullVec is null) ? we should error out here ? and handle it via 
continueOnError flag where indexColumn() is called ? 
   
   For reference - indexRow() throws an exception if row.getValue(columnName) 
is null



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -209,7 +249,7 @@ public void init(SegmentGeneratorConfig config, 
SegmentCreationDataSource dataSo
     }
 
     _ingestionSchemaValidator =
-        SchemaValidatorFactory.getSchemaValidator(_dataSchema, 
_recordReader.getClass().getName(),
+        SchemaValidatorFactory.getSchemaValidator(_dataSchema, readerClassName,

Review Comment:
   we're relying on getSchemaValidator to support null readerClassName which 
isn't ideal. 
   
   can we think of a more appropriate way to handle this ? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -677,4 +706,63 @@ public int getSkippedRowsFound() {
   public int getSanitizedRowsFound() {
     return _sanitizedRowsFound;
   }
+
+  /**
+   * Build segment using columnar approach.
+   * This method builds the segment by processing data column-wise instead of 
row-wise.
+   *
+   * @throws Exception if segment building fails
+   */
+  public void buildColumnar() throws Exception {
+    if (!(_dataSource instanceof ColumnarSegmentCreationDataSource)) {
+      throw new IllegalStateException("buildColumnar() can only be called 
after initColumnar()");
+    }
+
+    ColumnarSegmentCreationDataSource columnarDataSource = 
(ColumnarSegmentCreationDataSource) _dataSource;
+    Map<String, ColumnReader> columnReaders = 
columnarDataSource.getColumnReaders();
+
+    LOGGER.info("Starting columnar segment building for {} columns", 
columnReaders.size());
+
+    // Reuse existing stats collection and index creation info logic
+    LOGGER.debug("Start building StatsCollector!");
+    collectStatsAndIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    if (_totalDocs == 0) {
+      LOGGER.warn("No documents found in data source");
+      handlePostCreation();
+      return;
+    }
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir, null);
+
+      // Build the indexes column-wise (true column-major approach)
+      LOGGER.info("Start building Index using columnar approach");
+      long indexStartTime = System.nanoTime();
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+      for (String columnName : columns) {
+        LOGGER.debug("Indexing column: {}", columnName);
+        ColumnReader columnReader = columnReaders.get(columnName);
+        if (columnReader == null) {
+          throw new IllegalStateException("No column reader found for column: 
" + columnName);
+        }
+
+        // Index each column independently using true column-major approach
+        // This is similar to how buildByColumn works but uses ColumnReader 
instead of IndexSegment
+        ((SegmentColumnarIndexCreator) _indexCreator).indexColumn(columnName, 
columnReader);
+      }
+
+      _totalIndexTimeNs = System.nanoTime() - indexStartTime;
+    } catch (Exception e) {
+      _indexCreator.close();
+      throw e;
+    }

Review Comment:
   we need to close all column readers 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java:
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.ColumnReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ColumnReaderFactory implementation for immutable Pinot segments.
+ *
+ * <p>This factory creates ColumnReader instances for reading data from Pinot 
segments
+ * in a columnar fashion. It handles:
+ * <ul>
+ *   <li>Creating readers for existing columns in the segment</li>
+ *   <li>Creating default value readers for new columns</li>
+ *   <li>Resource management for all created readers</li>
+ * </ul>
+ */
+public class PinotSegmentColumnReaderFactory implements ColumnReaderFactory {

Review Comment:
   I think this class / interface needs to be refactored a bit 
   1. init() shouldn't take targetSchema as that can be passed in constructor 
itself. 
   2. getAllColumnReaders() calls createColumnReader() and both are public 
which isn't ideal. Rather init() should call createColumnReader() and 
createColumnReader() should be private. and getAllColumnReaders() should just 
get existing objects without creating new objects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to