noob-se7en commented on code in PR #16727:
URL: https://github.com/apache/pinot/pull/16727#discussion_r2414761599


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -171,31 +170,72 @@ public void init(SegmentGeneratorConfig config, 
RecordReader recordReader)
         new TransformPipeline(config.getTableConfig(), config.getSchema()));
   }
 
+  /**
+   * Initialize the driver for columnar segment building using a 
ColumnReaderFactory.
+   * This method sets up the driver to use column-wise input data access 
instead of row-wise.
+   *
+   * @param config Segment generator configuration
+   * @param columnReaderFactory Factory for creating column readers
+   * @throws Exception if initialization fails
+   */
+  public void init(SegmentGeneratorConfig config, ColumnReaderFactory 
columnReaderFactory)
+      throws Exception {
+    // Initialize the column reader factory with target schema
+    columnReaderFactory.init(config.getSchema());
+
+    // Create all column readers for the target schema
+    Map<String, ColumnReader> columnReaders = 
columnReaderFactory.getAllColumnReaders();
+
+    // Create columnar data source
+    ColumnarSegmentCreationDataSource columnarDataSource =
+        new ColumnarSegmentCreationDataSource(columnReaderFactory, 
columnReaders);
+
+    // Use the existing init method with columnar data source and no transform 
pipeline
+    init(config, columnarDataSource, null);
+
+    LOGGER.info("Initialized SegmentIndexCreationDriverImpl for columnar 
building with {} columns",

Review Comment:
   nit:
   ```suggestion
       LOGGER.info("Initialized SegmentIndexCreationDriverImpl for columnar 
data source building with {} columns",
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Factory interface for creating ColumnReader instances for different data 
sources.
+ *
+ * <p>This factory provides a generic way to create column readers for various 
data sources
+ * such as Pinot segments, Parquet files, etc. The factory handles:
+ * <ul>
+ *   <li>Creating column readers for existing columns</li>
+ *   <li>Creating default value readers for new columns</li>
+ *   <li>Data type conversions between source and target schemas</li>
+ *   <li>Resource management</li>
+ * </ul>
+ */
+public interface ColumnReaderFactory extends Closeable, Serializable {
+
+  /**
+   * Initialize the factory with the data source and target schema.
+   *
+   * @param targetSchema Target schema for the output segment
+   * @throws IOException If initialization fails
+   */
+  void init(Schema targetSchema) throws IOException;
+
+  /**
+   * Get the set of column names available in the source data.
+   *
+   * @return Set of available column names in the source
+   */
+  Set<String> getAvailableColumns();
+
+  /**
+   * Get the total number of documents/rows in the data source.
+   *
+   * @return Total number of documents
+   */
+  int getNumDocs();
+
+  /**
+   * Create a column reader for the specified column.
+   *
+   * @param columnName Name of the column to read
+   * @param targetFieldSpec Target field specification from the output schema
+   * @return ColumnReader instance for the specified column
+   * @throws IOException If the column reader cannot be created
+   */
+  ColumnReader createColumnReader(String columnName, FieldSpec 
targetFieldSpec) throws IOException;
+
+  /**
+   * Get all column readers for the target schema.
+   * This is a convenience method that creates readers for all columns in the 
target schema.
+   *
+   * @return Map of column name to ColumnReader
+   * @throws IOException If any column reader cannot be created
+   */
+  Map<String, ColumnReader> getAllColumnReaders() throws IOException;
+
+  /**
+   * Check if the specified column exists in the source data.
+   *
+   * @param columnName Column name to check
+   * @return true if the column exists in source, false otherwise
+   */
+  boolean hasColumn(String columnName);

Review Comment:
   nit: Does this need to be publicly exposed? Applicable for other methods 
like `getNumDocs()` as well.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -677,4 +706,63 @@ public int getSkippedRowsFound() {
   public int getSanitizedRowsFound() {
     return _sanitizedRowsFound;
   }
+
+  /**
+   * Build segment using columnar approach.
+   * This method builds the segment by processing data column-wise instead of 
row-wise.
+   *
+   * @throws Exception if segment building fails
+   */
+  public void buildColumnar() throws Exception {
+    if (!(_dataSource instanceof ColumnarSegmentCreationDataSource)) {
+      throw new IllegalStateException("buildColumnar() can only be called 
after initColumnar()");
+    }
+
+    ColumnarSegmentCreationDataSource columnarDataSource = 
(ColumnarSegmentCreationDataSource) _dataSource;
+    Map<String, ColumnReader> columnReaders = 
columnarDataSource.getColumnReaders();
+
+    LOGGER.info("Starting columnar segment building for {} columns", 
columnReaders.size());
+
+    // Reuse existing stats collection and index creation info logic
+    LOGGER.debug("Start building StatsCollector!");
+    collectStatsAndIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    if (_totalDocs == 0) {
+      LOGGER.warn("No documents found in data source");
+      handlePostCreation();
+      return;
+    }
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir, null);
+
+      // Build the indexes column-wise (true column-major approach)
+      LOGGER.info("Start building Index using columnar approach");
+      long indexStartTime = System.nanoTime();
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+      for (String columnName : columns) {
+        LOGGER.debug("Indexing column: {}", columnName);
+        ColumnReader columnReader = columnReaders.get(columnName);
+        if (columnReader == null) {
+          throw new IllegalStateException("No column reader found for column: 
" + columnName);
+        }
+
+        // Index each column independently using true column-major approach
+        // This is similar to how buildByColumn works but uses ColumnReader 
instead of IndexSegment
+        ((SegmentColumnarIndexCreator) _indexCreator).indexColumn(columnName, 
columnReader);

Review Comment:
   nit: reduntant cast?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java:
##########
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.ColumnReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ColumnReaderFactory implementation for immutable Pinot segments.
+ *
+ * <p>This factory creates ColumnReader instances for reading data from Pinot 
segments
+ * in a columnar fashion. It handles:
+ * <ul>
+ *   <li>Creating readers for existing columns in the segment</li>
+ *   <li>Creating default value readers for new columns</li>
+ *   <li>Resource management for all created readers</li>
+ * </ul>
+ */
+public class PinotSegmentColumnReaderFactory implements ColumnReaderFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSegmentColumnReaderFactory.class);
+
+  private final IndexSegment _indexSegment;
+  private Schema _targetSchema;
+  private final Map<String, ColumnReader> _columnReaders;
+
+  /**
+   * Create a PinotSegmentColumnReaderFactory.
+   *
+   * @param indexSegment Source segment to read from
+   */
+  public PinotSegmentColumnReaderFactory(IndexSegment indexSegment) {
+    _indexSegment = indexSegment;
+    _columnReaders = new HashMap<>();
+  }
+
+  @Override
+  public void init(Schema targetSchema) throws IOException {
+    _targetSchema = targetSchema;
+    LOGGER.info("Initialized PinotSegmentColumnReaderFactory with target 
schema containing {} columns",
+        targetSchema.getPhysicalColumnNames().size());
+  }
+
+  @Override
+  public Set<String> getAvailableColumns() {
+    return _indexSegment.getPhysicalColumnNames();
+  }
+
+  @Override
+  public int getNumDocs() {
+    return _indexSegment.getSegmentMetadata().getTotalDocs();
+  }
+
+  @Override
+  public ColumnReader createColumnReader(String columnName, FieldSpec 
targetFieldSpec) throws IOException {

Review Comment:
   nit: 'IOException' is never thrown in the method 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();

Review Comment:
   nit:
   ```suggestion
       _columnStatsCollectorMap = new HashMap<>(columnReaders.size());
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      switch (fieldSpec.getDataType().getStoredType()) {
+        case INT:
+          _columnStatsCollectorMap.put(columnName,
+              new IntColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case LONG:
+          _columnStatsCollectorMap.put(columnName,
+              new LongColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case FLOAT:
+          _columnStatsCollectorMap.put(columnName,
+              new FloatColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case DOUBLE:
+          _columnStatsCollectorMap.put(columnName,
+              new DoubleColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(columnName,
+              new BigDecimalColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case STRING:
+          _columnStatsCollectorMap.put(columnName,
+              new StringColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BYTES:
+          _columnStatsCollectorMap.put(columnName,
+              new BytesColumnPredIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case MAP:
+          _columnStatsCollectorMap.put(columnName,
+              new MapColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
+      }
+    }
+  }
+
+  /**
+   * Collect stats by iterating column-wise using the provided ColumnReader 
instances.
+   */
+  private void collectColumnStats() {
+    LOGGER.info("Collecting stats for {} columns using column-wise iteration", 
_columnReaders.size());
+
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      AbstractColumnStatisticsCollector statsCollector = 
_columnStatsCollectorMap.get(columnName);
+      ColumnReader columnReader = _columnReaders.get(columnName);
+
+      if (columnReader == null) {
+        throw new RuntimeException("Column reader for column " + columnName + 
" not found");
+      }
+
+      LOGGER.debug("Collecting stats for column: {}", columnName);
+      collectStatsFromColumnReader(columnName, columnReader, statsCollector);
+
+      // Seal the stats collector
+      statsCollector.seal();
+    }
+  }
+
+  /**
+   * Collect stats from a column reader by iterating over all values using the 
iterator pattern.
+   */
+  private void collectStatsFromColumnReader(String columnName, ColumnReader 
columnReader,
+                                           AbstractColumnStatisticsCollector 
statsCollector) {

Review Comment:
   nit: lint fix



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/ColumnarSegmentPreIndexStatsContainer.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.stats;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stats container that efficiently collects statistics from columnar data 
using ColumnReader instances.
+ *
+ * <p>This implementation collects statistics by iterating column-wise instead 
of row-wise,
+ * which is more efficient for columnar data sources. It supports:
+ * <ul>
+ *   <li>Column-wise statistics collection</li>
+ *   <li>Existing columns from source data</li>
+ *   <li>New columns with default values</li>
+ *   <li>Data type conversions during schema evolution</li>
+ * </ul>
+ *
+ * <p>The statistics are collected using the same underlying collectors as the 
row-based approach
+ * (SegmentPreIndexStatsCollectorImpl) but with more efficient column-wise 
iteration.
+ */
+public class ColumnarSegmentPreIndexStatsContainer implements 
SegmentPreIndexStatsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnarSegmentPreIndexStatsContainer.class);
+
+  private final Map<String, ColumnReader> _columnReaders;
+  private final StatsCollectorConfig _statsCollectorConfig;
+  private final Schema _targetSchema;
+  private final Map<String, AbstractColumnStatisticsCollector> 
_columnStatsCollectorMap;
+  private int _totalDocCount;
+
+  /**
+   * Create a ColumnarSegmentPreIndexStatsContainer.
+   *
+   * @param columnReaders Map of column name to ColumnReader instances
+   * @param statsCollectorConfig Configuration for statistics collection
+   */
+  public ColumnarSegmentPreIndexStatsContainer(Map<String, ColumnReader> 
columnReaders,
+                                              StatsCollectorConfig 
statsCollectorConfig) {
+    _columnReaders = columnReaders;
+    _statsCollectorConfig = statsCollectorConfig;
+    _targetSchema = statsCollectorConfig.getSchema();
+    _columnStatsCollectorMap = new HashMap<>();
+    _totalDocCount = -1; // indicates unset
+
+    initializeStatsCollectors();
+    collectColumnStats();
+  }
+
+  /**
+   * Initialize stats collectors for all columns in the target schema.
+   */
+  private void initializeStatsCollectors() {
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }
+
+      String columnName = fieldSpec.getName();
+      switch (fieldSpec.getDataType().getStoredType()) {
+        case INT:
+          _columnStatsCollectorMap.put(columnName,
+              new IntColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case LONG:
+          _columnStatsCollectorMap.put(columnName,
+              new LongColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case FLOAT:
+          _columnStatsCollectorMap.put(columnName,
+              new FloatColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case DOUBLE:
+          _columnStatsCollectorMap.put(columnName,
+              new DoubleColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BIG_DECIMAL:
+          _columnStatsCollectorMap.put(columnName,
+              new BigDecimalColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case STRING:
+          _columnStatsCollectorMap.put(columnName,
+              new StringColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case BYTES:
+          _columnStatsCollectorMap.put(columnName,
+              new BytesColumnPredIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        case MAP:
+          _columnStatsCollectorMap.put(columnName,
+              new MapColumnPreIndexStatsCollector(columnName, 
_statsCollectorConfig));
+          break;
+        default:
+          throw new IllegalStateException("Unsupported data type: " + 
fieldSpec.getDataType());
+      }
+    }
+  }
+
+  /**
+   * Collect stats by iterating column-wise using the provided ColumnReader 
instances.
+   */
+  private void collectColumnStats() {
+    LOGGER.info("Collecting stats for {} columns using column-wise iteration", 
_columnReaders.size());
+
+    for (FieldSpec fieldSpec : _targetSchema.getAllFieldSpecs()) {
+      if (fieldSpec.isVirtualColumn()) {
+        continue;
+      }

Review Comment:
   We can iterate on just `_columnStatsCollectorMap` to avoid this



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -677,4 +706,63 @@ public int getSkippedRowsFound() {
   public int getSanitizedRowsFound() {
     return _sanitizedRowsFound;
   }
+
+  /**
+   * Build segment using columnar approach.
+   * This method builds the segment by processing data column-wise instead of 
row-wise.
+   *
+   * @throws Exception if segment building fails
+   */
+  public void buildColumnar() throws Exception {

Review Comment:
   nit: should be public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to