9aman commented on code in PR #17304:
URL: https://github.com/apache/pinot/pull/17304#discussion_r2609688639


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/ColumnReaderTransformer.java:
##########
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+
+
+/**
+ * A decorator for {@link ColumnReader} that applies a chain of {@link 
ColumnTransformer}s to column data during
+ * ingestion. This class serves as the primary mechanism for transforming raw 
input data into Pinot's internal
+ * representation while maintaining optimal performance.
+ *
+ * <h3>Role and Responsibilities</h3>
+ * <p>
+ * The ColumnReaderTransformer wraps a source {@link ColumnReader} and 
transparently applies transformations to
+ * values as they are read. Built-in transformers include:
+ * <ul>
+ *   <li>{@link DataTypeColumnTransformer} - Handles data type conversions and 
validations</li>
+ *   <li>{@link NullValueColumnTransformer} - Manages null value handling and 
default value substitution</li>
+ *   <li>Additional custom transformers - User-defined transformations for 
specific use cases</li>
+ * </ul>
+ * </p>
+ *
+ * <h3>Performance Optimization Strategy</h3>
+ * <p>
+ * This class implements several critical performance optimizations to 
minimize overhead during data ingestion:
+ * </p>
+ *
+ * <h4>1. Transformer Segregation</h4>
+ * <p>
+ * The transformers are divided into two groups:
+ * <ul>
+ *   <li><b>_allTransformers</b> - All transformers including {@link 
NullValueColumnTransformer}</li>
+ *   <li><b>_allTransformersExceptNullTransformer</b> - All transformers 
except {@link NullValueColumnTransformer}</li>
+ * </ul>
+ * This segregation is necessary because {@link NullValueColumnTransformer} 
requires boxing primitive types to
+ * {@link Object}, which introduces significant performance overhead. By 
separating it, we can avoid this cost
+ * for primitive numeric types when null handling is not needed.
+ * </p>
+ *
+ * <h4>2. Type-Specific Optimization</h4>
+ * <p>
+ * The class optimizes differently based on data type:
+ * <ul>
+ *   <li><b>Primitive numeric types (int, long, float, double)</b> - Uses 
{@code _allTransformersExceptNullTransformer}
+ *       to avoid boxing overhead. When no transformers are present, directly 
delegates to the underlying reader
+ *       for zero transformation cost.</li>
+ *   <li><b>String and byte[] types</b> - Always uses {@code _allTransformers} 
since these are already reference
+ *       types and don't incur boxing penalties.</li>
+ * </ul>
+ * </p>
+ *
+ * <h4>3. Fast Path for Zero Transformers</h4>
+ * <p>
+ * When {@code _allTransformersExceptNullTransformer} is empty (common for 
primitive types with no transformations),
+ * all primitive accessor methods (e.g., {@link #nextInt()}, {@link 
#getLong(int)}) bypass transformation logic
+ * entirely and delegate directly to the underlying reader. This ensures zero 
overhead when no transformations are
+ * actually needed.
+ * </p>
+ *
+ * <h3>Null Value Handling</h3>
+ * <p>
+ * Special attention is paid to null value detection in the {@link 
#isNull(int)} method. This method must return
+ * {@code true} not only when the source value is null, but also when any 
transformer would produce a null result.
+ * This ensures that null vector indexes are built correctly even when 
transformers modify null semantics.
+ * However, the {@link NullValueColumnTransformer} itself is excluded from 
this check since it converts nulls
+ * to default values - we still want to track the original null state for 
index building purposes.
+ * </p>
+ *
+ * @see ColumnReader
+ * @see ColumnTransformer
+ * @see DataTypeColumnTransformer
+ * @see NullValueColumnTransformer
+ */
+public class ColumnReaderTransformer implements ColumnReader {
+
+  private final List<ColumnTransformer> _allTransformers;
+
+  // Transformers except NullValueColumnTransformer
+  // Since NullValueColumnTransformer is required for all columns, but has 
perf overhead (casting to Object),
+  // we separate it out to avoid calling it unless necessary.
+  private final List<ColumnTransformer> _allTransformersExceptNullTransformer;
+  private final ColumnReader _columnReader;
+
+  /**
+   * Creates a ColumnReaderTransformer with only built-in transformers (no 
additional custom transformers).
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader) {
+    this(tableConfig, schema, fieldSpec, columnReader, new ArrayList<>());
+  }
+
+  /**
+   * Creates a ColumnReaderTransformer with both built-in and additional 
custom transformers.
+   * The additional transformers are applied last in the order provided
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   * @param additionalTransformers Additional custom transformers to apply 
after built-in transformers.
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader, List<ColumnTransformer> 
additionalTransformers) {
+    _columnReader = columnReader;
+    _allTransformers = new ArrayList<>();
+    addIfNotNoOp(_allTransformers, new DataTypeColumnTransformer(tableConfig, 
fieldSpec, columnReader));
+    addIfNotNoOp(_allTransformers, new NullValueColumnTransformer(tableConfig, 
fieldSpec, schema));
+    for (ColumnTransformer transformer : additionalTransformers) {
+      addIfNotNoOp(_allTransformers, transformer);
+    }
+
+    _allTransformersExceptNullTransformer = new ArrayList<>();
+    for (ColumnTransformer transformer : _allTransformers) {
+      if (!(transformer instanceof NullValueColumnTransformer)) {
+        _allTransformersExceptNullTransformer.add(transformer);
+      }
+    }
+  }
+
+  private static void addIfNotNoOp(List<ColumnTransformer> transformers, 
@Nullable ColumnTransformer transformer) {
+    if (transformer != null && !transformer.isNoOp()) {
+      transformers.add(transformer);
+    }
+  }
+
+  private Object applyTransformers(Object value, List<ColumnTransformer> 
transformers) {
+    for (ColumnTransformer transformer : transformers) {
+      value = transformer.transform(value);
+    }
+    return value;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _columnReader.hasNext();
+  }
+
+  @Nullable
+  @Override
+  public Object next()
+      throws IOException {
+    return applyTransformers(_columnReader.next(), _allTransformers);
+  }
+
+  @Override
+  public boolean isNextNull()
+      throws IOException {
+    // TODO - consider checking transformers for null response similar to 
logic isNull(docId)
+    //  It requires peeking the next value without advancing the reader.
+    //  Once peek is supported in ColumnReader, we can implement this 
correctly.
+    return _columnReader.isNextNull();
+  }
+
+  @Override
+  public void skipNext()
+      throws IOException {
+    _columnReader.skipNext();
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return _columnReader.isSingleValue();
+  }
+
+  @Override
+  public boolean isInt() {
+    return _columnReader.isInt();
+  }
+
+  @Override
+  public boolean isLong() {
+    return _columnReader.isLong();
+  }
+
+  @Override
+  public boolean isFloat() {
+    return _columnReader.isFloat();
+  }
+
+  @Override
+  public boolean isDouble() {
+    return _columnReader.isDouble();
+  }
+
+  @Override
+  public boolean isString() {
+    return _columnReader.isString();
+  }
+
+  @Override
+  public boolean isBytes() {
+    return _columnReader.isBytes();
+  }
+
+  @Override
+  public int nextInt()

Review Comment:
   On a similar note, won't this lead to issue if a long is returned when the 
caller is promised an int in case of `DataTypeColumnTransformer`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {

Review Comment:
   Can it happen that we have more than one `DataTypeColumnTransformer` 
transformers ? 
   Is it allowed right now. 
   
   Say String -> Int -> certain operation -> String again ? 
   
   If so, isNoOp() will end up giving wrong response as it relies on the actual 
data and not the intermediate data that the reader relies on. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/ColumnReaderTransformer.java:
##########
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+
+
+/**
+ * A decorator for {@link ColumnReader} that applies a chain of {@link 
ColumnTransformer}s to column data during
+ * ingestion. This class serves as the primary mechanism for transforming raw 
input data into Pinot's internal
+ * representation while maintaining optimal performance.
+ *
+ * <h3>Role and Responsibilities</h3>
+ * <p>
+ * The ColumnReaderTransformer wraps a source {@link ColumnReader} and 
transparently applies transformations to
+ * values as they are read. Built-in transformers include:
+ * <ul>
+ *   <li>{@link DataTypeColumnTransformer} - Handles data type conversions and 
validations</li>
+ *   <li>{@link NullValueColumnTransformer} - Manages null value handling and 
default value substitution</li>
+ *   <li>Additional custom transformers - User-defined transformations for 
specific use cases</li>
+ * </ul>
+ * </p>
+ *
+ * <h3>Performance Optimization Strategy</h3>
+ * <p>
+ * This class implements several critical performance optimizations to 
minimize overhead during data ingestion:
+ * </p>
+ *
+ * <h4>1. Transformer Segregation</h4>
+ * <p>
+ * The transformers are divided into two groups:
+ * <ul>
+ *   <li><b>_allTransformers</b> - All transformers including {@link 
NullValueColumnTransformer}</li>
+ *   <li><b>_allTransformersExceptNullTransformer</b> - All transformers 
except {@link NullValueColumnTransformer}</li>
+ * </ul>
+ * This segregation is necessary because {@link NullValueColumnTransformer} 
requires boxing primitive types to
+ * {@link Object}, which introduces significant performance overhead. By 
separating it, we can avoid this cost
+ * for primitive numeric types when null handling is not needed.
+ * </p>
+ *
+ * <h4>2. Type-Specific Optimization</h4>
+ * <p>
+ * The class optimizes differently based on data type:
+ * <ul>
+ *   <li><b>Primitive numeric types (int, long, float, double)</b> - Uses 
{@code _allTransformersExceptNullTransformer}
+ *       to avoid boxing overhead. When no transformers are present, directly 
delegates to the underlying reader
+ *       for zero transformation cost.</li>
+ *   <li><b>String and byte[] types</b> - Always uses {@code _allTransformers} 
since these are already reference
+ *       types and don't incur boxing penalties.</li>
+ * </ul>
+ * </p>
+ *
+ * <h4>3. Fast Path for Zero Transformers</h4>
+ * <p>
+ * When {@code _allTransformersExceptNullTransformer} is empty (common for 
primitive types with no transformations),
+ * all primitive accessor methods (e.g., {@link #nextInt()}, {@link 
#getLong(int)}) bypass transformation logic
+ * entirely and delegate directly to the underlying reader. This ensures zero 
overhead when no transformations are
+ * actually needed.
+ * </p>
+ *
+ * <h3>Null Value Handling</h3>
+ * <p>
+ * Special attention is paid to null value detection in the {@link 
#isNull(int)} method. This method must return
+ * {@code true} not only when the source value is null, but also when any 
transformer would produce a null result.
+ * This ensures that null vector indexes are built correctly even when 
transformers modify null semantics.
+ * However, the {@link NullValueColumnTransformer} itself is excluded from 
this check since it converts nulls
+ * to default values - we still want to track the original null state for 
index building purposes.
+ * </p>
+ *
+ * @see ColumnReader
+ * @see ColumnTransformer
+ * @see DataTypeColumnTransformer
+ * @see NullValueColumnTransformer
+ */
+public class ColumnReaderTransformer implements ColumnReader {
+
+  private final List<ColumnTransformer> _allTransformers;
+
+  // Transformers except NullValueColumnTransformer
+  // Since NullValueColumnTransformer is required for all columns, but has 
perf overhead (casting to Object),
+  // we separate it out to avoid calling it unless necessary.
+  private final List<ColumnTransformer> _allTransformersExceptNullTransformer;
+  private final ColumnReader _columnReader;
+
+  /**
+   * Creates a ColumnReaderTransformer with only built-in transformers (no 
additional custom transformers).
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader) {
+    this(tableConfig, schema, fieldSpec, columnReader, new ArrayList<>());
+  }
+
+  /**
+   * Creates a ColumnReaderTransformer with both built-in and additional 
custom transformers.
+   * The additional transformers are applied last in the order provided
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   * @param additionalTransformers Additional custom transformers to apply 
after built-in transformers.
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader, List<ColumnTransformer> 
additionalTransformers) {
+    _columnReader = columnReader;
+    _allTransformers = new ArrayList<>();
+    addIfNotNoOp(_allTransformers, new DataTypeColumnTransformer(tableConfig, 
fieldSpec, columnReader));
+    addIfNotNoOp(_allTransformers, new NullValueColumnTransformer(tableConfig, 
fieldSpec, schema));
+    for (ColumnTransformer transformer : additionalTransformers) {
+      addIfNotNoOp(_allTransformers, transformer);
+    }
+
+    _allTransformersExceptNullTransformer = new ArrayList<>();
+    for (ColumnTransformer transformer : _allTransformers) {
+      if (!(transformer instanceof NullValueColumnTransformer)) {
+        _allTransformersExceptNullTransformer.add(transformer);
+      }
+    }
+  }
+
+  private static void addIfNotNoOp(List<ColumnTransformer> transformers, 
@Nullable ColumnTransformer transformer) {
+    if (transformer != null && !transformer.isNoOp()) {
+      transformers.add(transformer);
+    }
+  }
+
+  private Object applyTransformers(Object value, List<ColumnTransformer> 
transformers) {
+    for (ColumnTransformer transformer : transformers) {
+      value = transformer.transform(value);
+    }
+    return value;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _columnReader.hasNext();
+  }
+
+  @Nullable
+  @Override
+  public Object next()
+      throws IOException {
+    return applyTransformers(_columnReader.next(), _allTransformers);
+  }
+
+  @Override
+  public boolean isNextNull()
+      throws IOException {
+    // TODO - consider checking transformers for null response similar to 
logic isNull(docId)
+    //  It requires peeking the next value without advancing the reader.
+    //  Once peek is supported in ColumnReader, we can implement this 
correctly.
+    return _columnReader.isNextNull();
+  }
+
+  @Override
+  public void skipNext()
+      throws IOException {
+    _columnReader.skipNext();
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return _columnReader.isSingleValue();
+  }
+
+  @Override
+  public boolean isInt() {
+    return _columnReader.isInt();

Review Comment:
   Won't this return wrong results when we have 
   `DataTypeColumnTransformer`
   Say the underlying type is int `and` the desired type after transformation 
is `long`. 
   
   I think it should return `isInt` as `false` and `isLong` as `true`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+  private final PinotDataType _destDataType;
+  private final ColumnReader _columnReader;
+  private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
+
+  /**
+   * @param fieldSpec - The field spec for the column being created in Pinot.
+   * @param columnReader - The column reader to read the source data.
+   */
+  public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, ColumnReader columnReader) {
+    _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+    _columnReader = columnReader;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    // If source and destination data types are primitive types and the same, 
no transformation is needed.
+    if (_columnReader.isSingleValue()) {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING);
+      }
+    } else {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG_ARRAY);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING_ARRAY);
+      }
+    }
+    // For other types, because there is no overhead to cast to Object, always 
call transform() which handles all cases
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    String columnName = _columnReader.getColumnName();
+    try {
+      return DataTypeTransformerUtils.transformValue(columnName, value, 
_destDataType);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw new RuntimeException("Caught exception while transforming data 
type for column: " + columnName, e);

Review Comment:
   Same as below. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/ColumnReaderTransformer.java:
##########
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+
+
+/**
+ * A decorator for {@link ColumnReader} that applies a chain of {@link 
ColumnTransformer}s to column data during
+ * ingestion. This class serves as the primary mechanism for transforming raw 
input data into Pinot's internal
+ * representation while maintaining optimal performance.
+ *
+ * <h3>Role and Responsibilities</h3>
+ * <p>
+ * The ColumnReaderTransformer wraps a source {@link ColumnReader} and 
transparently applies transformations to
+ * values as they are read. Built-in transformers include:
+ * <ul>
+ *   <li>{@link DataTypeColumnTransformer} - Handles data type conversions and 
validations</li>
+ *   <li>{@link NullValueColumnTransformer} - Manages null value handling and 
default value substitution</li>
+ *   <li>Additional custom transformers - User-defined transformations for 
specific use cases</li>
+ * </ul>
+ * </p>
+ *
+ * <h3>Performance Optimization Strategy</h3>
+ * <p>
+ * This class implements several critical performance optimizations to 
minimize overhead during data ingestion:
+ * </p>
+ *
+ * <h4>1. Transformer Segregation</h4>
+ * <p>
+ * The transformers are divided into two groups:
+ * <ul>
+ *   <li><b>_allTransformers</b> - All transformers including {@link 
NullValueColumnTransformer}</li>
+ *   <li><b>_allTransformersExceptNullTransformer</b> - All transformers 
except {@link NullValueColumnTransformer}</li>
+ * </ul>
+ * This segregation is necessary because {@link NullValueColumnTransformer} 
requires boxing primitive types to
+ * {@link Object}, which introduces significant performance overhead. By 
separating it, we can avoid this cost
+ * for primitive numeric types when null handling is not needed.
+ * </p>
+ *
+ * <h4>2. Type-Specific Optimization</h4>
+ * <p>
+ * The class optimizes differently based on data type:
+ * <ul>
+ *   <li><b>Primitive numeric types (int, long, float, double)</b> - Uses 
{@code _allTransformersExceptNullTransformer}
+ *       to avoid boxing overhead. When no transformers are present, directly 
delegates to the underlying reader
+ *       for zero transformation cost.</li>
+ *   <li><b>String and byte[] types</b> - Always uses {@code _allTransformers} 
since these are already reference
+ *       types and don't incur boxing penalties.</li>
+ * </ul>
+ * </p>
+ *
+ * <h4>3. Fast Path for Zero Transformers</h4>
+ * <p>
+ * When {@code _allTransformersExceptNullTransformer} is empty (common for 
primitive types with no transformations),
+ * all primitive accessor methods (e.g., {@link #nextInt()}, {@link 
#getLong(int)}) bypass transformation logic
+ * entirely and delegate directly to the underlying reader. This ensures zero 
overhead when no transformations are
+ * actually needed.
+ * </p>
+ *
+ * <h3>Null Value Handling</h3>
+ * <p>
+ * Special attention is paid to null value detection in the {@link 
#isNull(int)} method. This method must return
+ * {@code true} not only when the source value is null, but also when any 
transformer would produce a null result.
+ * This ensures that null vector indexes are built correctly even when 
transformers modify null semantics.
+ * However, the {@link NullValueColumnTransformer} itself is excluded from 
this check since it converts nulls
+ * to default values - we still want to track the original null state for 
index building purposes.
+ * </p>
+ *
+ * @see ColumnReader
+ * @see ColumnTransformer
+ * @see DataTypeColumnTransformer
+ * @see NullValueColumnTransformer
+ */
+public class ColumnReaderTransformer implements ColumnReader {
+
+  private final List<ColumnTransformer> _allTransformers;
+
+  // Transformers except NullValueColumnTransformer
+  // Since NullValueColumnTransformer is required for all columns, but has 
perf overhead (casting to Object),
+  // we separate it out to avoid calling it unless necessary.
+  private final List<ColumnTransformer> _allTransformersExceptNullTransformer;
+  private final ColumnReader _columnReader;
+
+  /**
+   * Creates a ColumnReaderTransformer with only built-in transformers (no 
additional custom transformers).
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader) {
+    this(tableConfig, schema, fieldSpec, columnReader, new ArrayList<>());
+  }
+
+  /**
+   * Creates a ColumnReaderTransformer with both built-in and additional 
custom transformers.
+   * The additional transformers are applied last in the order provided
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   * @param additionalTransformers Additional custom transformers to apply 
after built-in transformers.
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader, List<ColumnTransformer> 
additionalTransformers) {
+    _columnReader = columnReader;
+    _allTransformers = new ArrayList<>();
+    addIfNotNoOp(_allTransformers, new DataTypeColumnTransformer(tableConfig, 
fieldSpec, columnReader));
+    addIfNotNoOp(_allTransformers, new NullValueColumnTransformer(tableConfig, 
fieldSpec, schema));
+    for (ColumnTransformer transformer : additionalTransformers) {
+      addIfNotNoOp(_allTransformers, transformer);
+    }
+
+    _allTransformersExceptNullTransformer = new ArrayList<>();
+    for (ColumnTransformer transformer : _allTransformers) {
+      if (!(transformer instanceof NullValueColumnTransformer)) {
+        _allTransformersExceptNullTransformer.add(transformer);
+      }
+    }
+  }
+
+  private static void addIfNotNoOp(List<ColumnTransformer> transformers, 
@Nullable ColumnTransformer transformer) {
+    if (transformer != null && !transformer.isNoOp()) {
+      transformers.add(transformer);
+    }
+  }
+
+  private Object applyTransformers(Object value, List<ColumnTransformer> 
transformers) {
+    for (ColumnTransformer transformer : transformers) {
+      value = transformer.transform(value);
+    }
+    return value;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _columnReader.hasNext();
+  }
+
+  @Nullable
+  @Override
+  public Object next()
+      throws IOException {
+    return applyTransformers(_columnReader.next(), _allTransformers);
+  }
+
+  @Override
+  public boolean isNextNull()
+      throws IOException {
+    // TODO - consider checking transformers for null response similar to 
logic isNull(docId)
+    //  It requires peeking the next value without advancing the reader.
+    //  Once peek is supported in ColumnReader, we can implement this 
correctly.
+    return _columnReader.isNextNull();
+  }
+
+  @Override
+  public void skipNext()
+      throws IOException {
+    _columnReader.skipNext();
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return _columnReader.isSingleValue();
+  }
+
+  @Override
+  public boolean isInt() {
+    return _columnReader.isInt();
+  }
+
+  @Override
+  public boolean isLong() {
+    return _columnReader.isLong();
+  }
+
+  @Override
+  public boolean isFloat() {
+    return _columnReader.isFloat();
+  }
+
+  @Override
+  public boolean isDouble() {
+    return _columnReader.isDouble();
+  }
+
+  @Override
+  public boolean isString() {
+    return _columnReader.isString();
+  }
+
+  @Override
+  public boolean isBytes() {
+    return _columnReader.isBytes();
+  }
+
+  @Override
+  public int nextInt()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextInt();
+    } else {
+      return (int) applyTransformers(_columnReader.nextInt(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public long nextLong()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextLong();
+    } else {
+      return (long) applyTransformers(_columnReader.nextLong(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public float nextFloat()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextFloat();
+    } else {
+      return (float) applyTransformers(_columnReader.nextFloat(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public double nextDouble()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextDouble();
+    } else {
+      return (double) applyTransformers(_columnReader.nextDouble(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public String nextString()
+      throws IOException {
+    return (String) applyTransformers(_columnReader.nextString(), 
_allTransformers);
+  }
+
+  @Override
+  public byte[] nextBytes()
+      throws IOException {
+    return (byte[]) applyTransformers(_columnReader.nextBytes(), 
_allTransformers);
+  }
+
+  @Override
+  public int[] nextIntMV()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextIntMV();
+    } else {
+      return (int[]) applyTransformers(_columnReader.nextIntMV(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public long[] nextLongMV()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextLongMV();
+    } else {
+      return (long[]) applyTransformers(_columnReader.nextLongMV(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public float[] nextFloatMV()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextFloatMV();
+    } else {
+      return (float[]) applyTransformers(_columnReader.nextFloatMV(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public double[] nextDoubleMV()
+      throws IOException {
+    if (_allTransformersExceptNullTransformer.isEmpty()) {
+      // If there are no transformers, avoid casting to Object unnecessarily
+      return _columnReader.nextDoubleMV();
+    } else {
+      return (double[]) applyTransformers(_columnReader.nextDoubleMV(), 
_allTransformersExceptNullTransformer);
+    }
+  }
+
+  @Override
+  public String[] nextStringMV()
+      throws IOException {
+    return (String[]) applyTransformers(_columnReader.nextStringMV(), 
_allTransformers);
+  }
+
+  @Override
+  public byte[][] nextBytesMV()
+      throws IOException {
+    return (byte[][]) applyTransformers(_columnReader.nextBytesMV(), 
_allTransformers);
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _columnReader.rewind();
+  }
+
+  @Override
+  public String getColumnName() {
+    return _columnReader.getColumnName();
+  }
+
+  @Override
+  public int getTotalDocs() {
+    return _columnReader.getTotalDocs();
+  }
+
+  // Check if the value itself is null or if any of the transformers would 
return null for the value
+  // The latter is important because NullValueColumnTransformer will transform 
null to default value
+  // In those cases, we still want isNull to return true so that the null 
vector index can be built correctly
+  @Override
+  public boolean isNull(int docId)

Review Comment:
   Is this behavior similar to that of row transformation ? 
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/ColumnReaderTransformer.java:
##########
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+
+
+/**
+ * A decorator for {@link ColumnReader} that applies a chain of {@link 
ColumnTransformer}s to column data during
+ * ingestion. This class serves as the primary mechanism for transforming raw 
input data into Pinot's internal
+ * representation while maintaining optimal performance.
+ *
+ * <h3>Role and Responsibilities</h3>
+ * <p>
+ * The ColumnReaderTransformer wraps a source {@link ColumnReader} and 
transparently applies transformations to
+ * values as they are read. Built-in transformers include:
+ * <ul>
+ *   <li>{@link DataTypeColumnTransformer} - Handles data type conversions and 
validations</li>
+ *   <li>{@link NullValueColumnTransformer} - Manages null value handling and 
default value substitution</li>
+ *   <li>Additional custom transformers - User-defined transformations for 
specific use cases</li>
+ * </ul>
+ * </p>
+ *
+ * <h3>Performance Optimization Strategy</h3>
+ * <p>
+ * This class implements several critical performance optimizations to 
minimize overhead during data ingestion:
+ * </p>
+ *
+ * <h4>1. Transformer Segregation</h4>
+ * <p>
+ * The transformers are divided into two groups:
+ * <ul>
+ *   <li><b>_allTransformers</b> - All transformers including {@link 
NullValueColumnTransformer}</li>
+ *   <li><b>_allTransformersExceptNullTransformer</b> - All transformers 
except {@link NullValueColumnTransformer}</li>
+ * </ul>
+ * This segregation is necessary because {@link NullValueColumnTransformer} 
requires boxing primitive types to
+ * {@link Object}, which introduces significant performance overhead. By 
separating it, we can avoid this cost
+ * for primitive numeric types when null handling is not needed.
+ * </p>
+ *
+ * <h4>2. Type-Specific Optimization</h4>
+ * <p>
+ * The class optimizes differently based on data type:
+ * <ul>
+ *   <li><b>Primitive numeric types (int, long, float, double)</b> - Uses 
{@code _allTransformersExceptNullTransformer}
+ *       to avoid boxing overhead. When no transformers are present, directly 
delegates to the underlying reader
+ *       for zero transformation cost.</li>
+ *   <li><b>String and byte[] types</b> - Always uses {@code _allTransformers} 
since these are already reference
+ *       types and don't incur boxing penalties.</li>
+ * </ul>
+ * </p>
+ *
+ * <h4>3. Fast Path for Zero Transformers</h4>
+ * <p>
+ * When {@code _allTransformersExceptNullTransformer} is empty (common for 
primitive types with no transformations),
+ * all primitive accessor methods (e.g., {@link #nextInt()}, {@link 
#getLong(int)}) bypass transformation logic
+ * entirely and delegate directly to the underlying reader. This ensures zero 
overhead when no transformations are
+ * actually needed.
+ * </p>
+ *
+ * <h3>Null Value Handling</h3>
+ * <p>
+ * Special attention is paid to null value detection in the {@link 
#isNull(int)} method. This method must return
+ * {@code true} not only when the source value is null, but also when any 
transformer would produce a null result.
+ * This ensures that null vector indexes are built correctly even when 
transformers modify null semantics.
+ * However, the {@link NullValueColumnTransformer} itself is excluded from 
this check since it converts nulls
+ * to default values - we still want to track the original null state for 
index building purposes.
+ * </p>
+ *
+ * @see ColumnReader
+ * @see ColumnTransformer
+ * @see DataTypeColumnTransformer
+ * @see NullValueColumnTransformer
+ */
+public class ColumnReaderTransformer implements ColumnReader {
+
+  private final List<ColumnTransformer> _allTransformers;
+
+  // Transformers except NullValueColumnTransformer
+  // Since NullValueColumnTransformer is required for all columns, but has 
perf overhead (casting to Object),
+  // we separate it out to avoid calling it unless necessary.
+  private final List<ColumnTransformer> _allTransformersExceptNullTransformer;
+  private final ColumnReader _columnReader;
+
+  /**
+   * Creates a ColumnReaderTransformer with only built-in transformers (no 
additional custom transformers).
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader) {
+    this(tableConfig, schema, fieldSpec, columnReader, new ArrayList<>());
+  }
+
+  /**
+   * Creates a ColumnReaderTransformer with both built-in and additional 
custom transformers.
+   * The additional transformers are applied last in the order provided
+   *
+   * @param fieldSpec The field specification for the column being transformed 
and created in Pinot
+   * @param columnReader The source column reader to read raw data from
+   * @param additionalTransformers Additional custom transformers to apply 
after built-in transformers.
+   */
+  public ColumnReaderTransformer(TableConfig tableConfig, Schema schema,
+      FieldSpec fieldSpec, ColumnReader columnReader, List<ColumnTransformer> 
additionalTransformers) {
+    _columnReader = columnReader;
+    _allTransformers = new ArrayList<>();
+    addIfNotNoOp(_allTransformers, new DataTypeColumnTransformer(tableConfig, 
fieldSpec, columnReader));
+    addIfNotNoOp(_allTransformers, new NullValueColumnTransformer(tableConfig, 
fieldSpec, schema));
+    for (ColumnTransformer transformer : additionalTransformers) {
+      addIfNotNoOp(_allTransformers, transformer);
+    }
+
+    _allTransformersExceptNullTransformer = new ArrayList<>();
+    for (ColumnTransformer transformer : _allTransformers) {
+      if (!(transformer instanceof NullValueColumnTransformer)) {
+        _allTransformersExceptNullTransformer.add(transformer);
+      }
+    }
+  }
+
+  private static void addIfNotNoOp(List<ColumnTransformer> transformers, 
@Nullable ColumnTransformer transformer) {
+    if (transformer != null && !transformer.isNoOp()) {
+      transformers.add(transformer);
+    }
+  }
+
+  private Object applyTransformers(Object value, List<ColumnTransformer> 
transformers) {
+    for (ColumnTransformer transformer : transformers) {
+      value = transformer.transform(value);
+    }
+    return value;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _columnReader.hasNext();
+  }
+
+  @Nullable
+  @Override
+  public Object next()
+      throws IOException {
+    return applyTransformers(_columnReader.next(), _allTransformers);
+  }
+
+  @Override
+  public boolean isNextNull()
+      throws IOException {
+    // TODO - consider checking transformers for null response similar to 
logic isNull(docId)
+    //  It requires peeking the next value without advancing the reader.
+    //  Once peek is supported in ColumnReader, we can implement this 
correctly.
+    return _columnReader.isNextNull();
+  }
+
+  @Override
+  public void skipNext()
+      throws IOException {
+    _columnReader.skipNext();
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return _columnReader.isSingleValue();
+  }
+
+  @Override
+  public boolean isInt() {
+    return _columnReader.isInt();
+  }
+
+  @Override
+  public boolean isLong() {
+    return _columnReader.isLong();
+  }
+
+  @Override
+  public boolean isFloat() {
+    return _columnReader.isFloat();
+  }
+
+  @Override
+  public boolean isDouble() {
+    return _columnReader.isDouble();
+  }
+
+  @Override
+  public boolean isString() {
+    return _columnReader.isString();
+  }
+
+  @Override
+  public boolean isBytes() {
+    return _columnReader.isBytes();
+  }
+
+  @Override
+  public int nextInt()

Review Comment:
   Or in case of in-compatibility we might get `ClassCastException:`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java:
##########


Review Comment:
   Will these changes in any way impact: `PinotSegmentColumnReaderFactory`
   Given that the factory relies on these columnReaders
   
   _indexCreator.indexColumn(columnName, columnReader);
   in `SegmentIndexCreationDriverImpl.java`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java:
##########
@@ -58,56 +49,6 @@ public DefaultValueColumnReader(String columnName, int 
numDocs, FieldSpec fieldS
     _numDocs = numDocs;
     _currentIndex = 0;
     _dataType = fieldSpec.getDataType();
-
-    // For multi-value fields, wrap the default value in an array

Review Comment:
   I guess these changes are in reference to this discussion: 
https://github.com/apache/pinot/pull/17293#discussion_r2602485648



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+  private final PinotDataType _destDataType;
+  private final ColumnReader _columnReader;
+  private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
+
+  /**
+   * @param fieldSpec - The field spec for the column being created in Pinot.
+   * @param columnReader - The column reader to read the source data.
+   */
+  public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, ColumnReader columnReader) {
+    _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+    _columnReader = columnReader;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    // If source and destination data types are primitive types and the same, 
no transformation is needed.
+    if (_columnReader.isSingleValue()) {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING);
+      }
+    } else {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG_ARRAY);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING_ARRAY);
+      }
+    }
+    // For other types, because there is no overhead to cast to Object, always 
call transform() which handles all cases
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    String columnName = _columnReader.getColumnName();
+    try {
+      return DataTypeTransformerUtils.transformValue(columnName, value, 
_destDataType);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw new RuntimeException("Caught exception while transforming data 
type for column: " + columnName, e);
+      }
+      _throttledLogger.warn("Caught exception while transforming data type for 
column: " + columnName, e);
+      return null;

Review Comment:
   I see the above behavior is similar. 
   
   It seems  `record.markIncomplete();` is used for two purposes
   1. Generate metric for keeping count of such records.
   2. In complex transforms.
   
   Are we planning to capture this info in some other form. 
   Will it impact out ability to perform complex transformations ? 
   



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+  private final PinotDataType _destDataType;
+  private final ColumnReader _columnReader;
+  private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
+
+  /**
+   * @param fieldSpec - The field spec for the column being created in Pinot.
+   * @param columnReader - The column reader to read the source data.
+   */
+  public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, ColumnReader columnReader) {
+    _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+    _columnReader = columnReader;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    // If source and destination data types are primitive types and the same, 
no transformation is needed.
+    if (_columnReader.isSingleValue()) {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING);
+      }
+    } else {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG_ARRAY);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING_ARRAY);
+      }
+    }
+    // For other types, because there is no overhead to cast to Object, always 
call transform() which handles all cases
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    String columnName = _columnReader.getColumnName();
+    try {
+      return DataTypeTransformerUtils.transformValue(columnName, value, 
_destDataType);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw new RuntimeException("Caught exception while transforming data 
type for column: " + columnName, e);
+      }
+      _throttledLogger.warn("Caught exception while transforming data type for 
column: " + columnName, e);

Review Comment:
   Can we log the destination data type here and the value here ? 
   
   I am not sure whether we log values given the security concerns.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+  private final PinotDataType _destDataType;
+  private final ColumnReader _columnReader;
+  private final boolean _continueOnError;
+  private final ThrottledLogger _throttledLogger;
+
+  /**
+   * @param fieldSpec - The field spec for the column being created in Pinot.
+   * @param columnReader - The column reader to read the source data.
+   */
+  public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec 
fieldSpec, ColumnReader columnReader) {
+    _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+    _columnReader = columnReader;
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
+    _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+  }
+
+  @Override
+  public boolean isNoOp() {
+    // If source and destination data types are primitive types and the same, 
no transformation is needed.
+    if (_columnReader.isSingleValue()) {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING);
+      }
+    } else {
+      if (_columnReader.isInt()) {
+        return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+      } else if (_columnReader.isLong()) {
+        return _destDataType.equals(PinotDataType.LONG_ARRAY);
+      } else if (_columnReader.isFloat()) {
+        return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+      } else if (_columnReader.isDouble()) {
+        return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+      } else if (_columnReader.isString()) {
+        return _destDataType.equals(PinotDataType.STRING_ARRAY);
+      }
+    }
+    // For other types, because there is no overhead to cast to Object, always 
call transform() which handles all cases
+    return false;
+  }
+
+  @Override
+  public Object transform(Object value) {
+    String columnName = _columnReader.getColumnName();
+    try {
+      return DataTypeTransformerUtils.transformValue(columnName, value, 
_destDataType);
+    } catch (Exception e) {
+      if (!_continueOnError) {
+        throw new RuntimeException("Caught exception while transforming data 
type for column: " + columnName, e);
+      }
+      _throttledLogger.warn("Caught exception while transforming data type for 
column: " + columnName, e);
+      return null;

Review Comment:
   Will this impact the null index ? Is this behavior similar to that of the 
row level transformations 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to