This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b292c1ac4c Make record extractors self-contained; drop 
BaseRecordExtractor.convert dispatcher (#18436)
7b292c1ac4c is described below

commit 7b292c1ac4cf8d1c3e7bfe7d4fc01a8631d0b31f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 7 11:30:24 2026 -0700

    Make record extractors self-contained; drop BaseRecordExtractor.convert 
dispatcher (#18436)
---
 .../inputformat/avro/AvroRecordExtractor.java      |  53 ++--
 .../inputformat/clplog/CLPLogRecordExtractor.java  |  86 +++---
 .../clplog/CLPLogRecordExtractorTest.java          |  47 +++
 .../plugin/inputformat/csv/CSVRecordExtractor.java |   3 +-
 .../inputformat/json/JSONRecordExtractor.java      |  73 ++++-
 .../inputformat/json/JSONRecordExtractorTest.java  |   2 +-
 .../plugin/inputformat/orc/ORCRecordExtractor.java |   4 +-
 .../parquet/ParquetAvroRecordExtractor.java        |   2 +-
 .../parquet/ParquetNativeRecordExtractor.java      |  49 +--
 .../protobuf/ProtoBufRecordExtractor.java          |  26 +-
 .../inputformat/thrift/ThriftRecordExtractor.java  | 133 +++++---
 .../thrift/ThriftRecordExtractorTest.java          |  27 +-
 .../spi/data/readers/BaseRecordExtractor.java      | 183 +----------
 .../spi/data/readers/BaseRecordExtractorTest.java  | 343 ++-------------------
 14 files changed, 357 insertions(+), 674 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 43c086bd8f6..f27f4246d40 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -44,39 +44,32 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
 
 
-/// Extracts Pinot [GenericRow] from an Avro [GenericRecord]. A single 
schema-driven walk produces values
-/// that satisfy the `RecordExtractor` contract directly — no second pass 
through `BaseRecordExtractor#convert`.
+/// Extracts Pinot [GenericRow] from an Avro [GenericRecord].
 ///
 /// **Avro source type → Java input → Java output type:**
-/// - avro `boolean` → `Boolean` → `Boolean`
-/// - avro `int` → `Integer` → `Integer`
-/// - avro `long` → `Long` → `Long`
-/// - avro `float` → `Float` → `Float`
-/// - avro `double` → `Double` → `Double`
-/// - avro `string` → `Utf8` → `String` (via `Utf8.toString()`)
-/// - avro `bytes` → `ByteBuffer` → `byte[]` (materialized from the buffer's 
remaining content)
-/// - avro `fixed` → `GenericFixed` / `GenericData.Fixed` → `byte[]`
-/// - avro `enum` → `GenericData.EnumSymbol` → `String` (enum name)
-/// - avro `array<T>` → `List<T>` → `Object[]` (each element recursively 
converted)
-/// - avro `map<string, T>` → `Map<Utf8, T>` → `Map<String, Object>` (each 
value recursively converted)
-/// - avro nested `record` → `GenericRecord` → `Map<String, Object>`
-/// - avro `union[null, X]` with the `null` branch selected → `null`
+/// - `boolean` → `Boolean` → `Boolean`
+/// - `int` → `Integer` → `Integer`
+/// - `long` → `Long` → `Long`
+/// - `float` → `Float` → `Float`
+/// - `double` → `Double` → `Double`
+/// - `string` → `Utf8` → `String` (via `Utf8.toString()`)
+/// - `bytes` → `ByteBuffer` → `byte[]` (materialized from the buffer's 
remaining content)
+/// - `fixed` → `GenericFixed` / `GenericData.Fixed` → `byte[]`
+/// - `enum` → `GenericData.EnumSymbol` → `String` (enum name)
+/// - `array<T>` → `List<T>` → `Object[]` (each element recursively converted)
+/// - `map<string, T>` → `Map<Utf8, T>` → `Map<String, Object>` (each value 
recursively converted)
+/// - `record` → `GenericRecord` → `Map<String, Object>`
+/// - `union[null, X]` with the `null` branch selected → `null`
 ///
-/// **Logical types** (the reader emits the raw Avro physical type; this 
extractor applies the conversion via
-///   [#CONVERSION_MAP] to produce the Pinot contract type):
-/// - `decimal` / `big-decimal` → raw `ByteBuffer` → [BigDecimal] (always 
converted; raw bytes aren't interpretable
-///   without external precision/scale)
-/// - `timestamp-millis` → raw `Long` → [Timestamp], or `Long` raw epoch 
millis when `extractRawTimeValues` is `true`
-/// - `timestamp-micros` → raw `Long` → [Timestamp] (sub-millisecond micros 
preserved), or `Long` raw epoch micros when
-///   `extractRawTimeValues` is `true`
-/// - `timestamp-nanos` → raw `Long` → [Timestamp] (nanosecond precision 
preserved), or `Long` raw epoch nanos when
-///   `extractRawTimeValues` is `true`
-/// - `date` → raw `Integer` → [LocalDate], or `Integer` raw days-since-epoch 
when `extractRawTimeValues` is `true`
-/// - `time-millis` → raw `Integer` → [LocalTime], or `Integer` raw 
ms-since-midnight when `extractRawTimeValues` is
-///   `true`
-/// - `time-micros` → raw `Long` → [LocalTime], or `Long` raw 
µs-since-midnight when `extractRawTimeValues` is `true`
-/// - `uuid` → raw `Utf8` / `GenericFixed` → [UUID] (always converted; 
downstream type transformer adapts to the Pinot
-///   column type — `STRING` column gets canonical UUID string, `BYTES` column 
gets 16-byte big-endian form)
+/// **Logical types:**
+/// - `decimal` / `big-decimal` → `ByteBuffer` → [BigDecimal]
+/// - `timestamp-millis` → `Long` → [Timestamp], or `Long` epoch millis when 
`extractRawTimeValues` is `true`
+/// - `timestamp-micros` → `Long` → [Timestamp], or `Long` epoch micros when 
`extractRawTimeValues` is `true`
+/// - `timestamp-nanos` → `Long` → [Timestamp], or `Long` epoch nanos when 
`extractRawTimeValues` is `true`
+/// - `date` → `Integer` → [LocalDate], or `Integer` days-since-epoch when 
`extractRawTimeValues` is `true`
+/// - `time-millis` → `Integer` → [LocalTime], or `Integer` ms-since-midnight 
when `extractRawTimeValues` is `true`
+/// - `time-micros` → `Long` → [LocalTime], or `Long` µs-since-midnight when 
`extractRawTimeValues` is `true`
+/// - `uuid` → `Utf8` / `GenericFixed` → [UUID]
 public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
   private static final Conversion<BigDecimal> DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
   private static final Conversion<BigDecimal> BIG_DECIMAL_CONVERSION = new 
Conversions.BigDecimalConversion();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index aebefbfe66f..06885200d34 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -20,11 +20,15 @@ package org.apache.pinot.plugin.inputformat.clplog;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
 import com.yscope.clp.compressorfrontend.EncodedMessage;
 import com.yscope.clp.compressorfrontend.MessageEncoder;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -41,33 +45,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * A record extractor for log events. For configuration options, see {@link 
CLPLogRecordExtractorConfig}. This is an
- * experimental feature.
- * <p></p>
- * The goal of this record extractor is to allow us to encode the fields 
specified in
- * {@link CLPLogRecordExtractorConfig} using CLP. CLP is a compressor designed 
to encode unstructured log messages in a
- * way that makes them more compressible. It does this by decomposing a 
message into three fields:
- * <ul>
- *   <li>the message's static text, called a log type;</li>
- *   <li>repetitive variable values, called dictionary variables; and</li>
- *   <li>non-repetitive variable values (called encoded variables since we 
encode them specially if possible).</li>
- * </ul>
- * For instance, if the field "message" is encoded, then the extractor will 
output:
- * <ul>
- *   <li>message_logtype</li>
- *   <li>message_dictionaryVars</li>
- *   <li>message_encodedVars</li>
- * </ul>
- * All remaining fields are processed in the same way as they are in
- * {@link org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor}. 
Specifically:
- * <ul>
- *   <li>If the caller passed a set of fields to {@code init}, then only those 
fields are extracted from each
- *   record and any remaining fields are dropped.</li>
- *   <li>Otherwise, all fields are extracted from each record.</li>
- * </ul>
- * This class' implementation is based on {@link 
org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor}.
- */
+/// Record extractor for log events. Experimental. For configuration options 
see [CLPLogRecordExtractorConfig].
+///
+/// Each field configured for CLP encoding is split into three sibling fields 
— `_logtype`,
+/// `_dictionaryVars`, `_encodedVars`. All other fields are extracted as plain 
JSON values.
 public class CLPLogRecordExtractor extends BaseRecordExtractor<Map<String, 
Object>> {
   // The maximum number of variables that can be stored in a cell (row of a 
single column).
   private static final int MAX_VARIABLES_PER_CELL = 
ForwardIndexType.MAX_MULTI_VALUES_PER_ROW;
@@ -139,10 +120,7 @@ public class CLPLogRecordExtractor extends 
BaseRecordExtractor<Map<String, Objec
         if (clpEncodedFieldNames.contains(key)) {
           encodeFieldWithClp(key, value, to);
         } else {
-          if (value != null) {
-            value = convert(value);
-          }
-          to.putValue(key, value);
+          to.putValue(key, value != null ? convert(value) : null);
         }
       }
       return to;
@@ -151,10 +129,7 @@ public class CLPLogRecordExtractor extends 
BaseRecordExtractor<Map<String, Objec
     // Handle un-encoded fields
     for (String fieldName : _fields) {
       Object value = from.get(fieldName);
-      if (value != null) {
-        value = convert(value);
-      }
-      to.putValue(fieldName, value);
+      to.putValue(fieldName, value != null ? convert(value) : null);
     }
 
     // Handle encoded fields
@@ -165,6 +140,47 @@ public class CLPLogRecordExtractor extends 
BaseRecordExtractor<Map<String, Objec
     return to;
   }
 
+  /// Walks a non-null Jackson-parsed value and produces the contract shape: 
`BigDecimal` for `BigInteger`
+  /// (oversized ints), `Object[]` for JSON arrays, `Map<String, Object>` for 
JSON objects, pass-through for
+  /// the other Jackson scalar types.
+  private static Object convert(Object value) {
+    // BigInteger widens (Pinot has no BigInteger type)
+    if (value instanceof BigInteger) {
+      return new BigDecimal((BigInteger) value);
+    }
+    // List
+    if (value instanceof List) {
+      //noinspection unchecked
+      return convertList((List<Object>) value);
+    }
+    // Map
+    if (value instanceof Map) {
+      //noinspection unchecked
+      return convertMap((Map<String, Object>) value);
+    }
+    // Single value pass-through (Boolean / Integer / Long / Double / String)
+    return value;
+  }
+
+  private static Object[] convertList(List<Object> list) {
+    int n = list.size();
+    Object[] result = new Object[n];
+    for (int i = 0; i < n; i++) {
+      Object v = list.get(i);
+      result[i] = v != null ? convert(v) : null;
+    }
+    return result;
+  }
+
+  private static Map<String, Object> convertMap(Map<String, Object> map) {
+    Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      Object v = entry.getValue();
+      result.put(entry.getKey(), v != null ? convert(v) : null);
+    }
+    return result;
+  }
+
   /**
    * Encodes a field with CLP
    * <p></p>
diff --git 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 055481f7282..559cfa3bcd4 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -21,7 +21,11 @@ package org.apache.pinot.plugin.inputformat.clplog;
 import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
 import com.yscope.clp.compressorfrontend.MessageDecoder;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
@@ -88,6 +92,49 @@ public class CLPLogRecordExtractorTest {
     assertNull(row.getValue(MESSAGE_1_FIELD_NAME + 
ClpRewriter.LOGTYPE_COLUMN_SUFFIX));
   }
 
+  // === Un-encoded field conversion (same dispatch as JSONRecordExtractor) ===
+
+  @Test
+  public void testBigIntegerWidenedToBigDecimal() {
+    // Default Jackson parses integer literals that overflow `Long` as 
`BigInteger`. The extractor widens
+    // to `BigDecimal` since Pinot has no `BigInteger` type.
+    BigInteger value = new BigInteger("99999999999999999999999999");
+    GenericRow row = extractUnencoded("payload", value);
+    assertEquals(row.getValue("payload"), new BigDecimal(value));
+  }
+
+  @Test
+  public void testListExtractedAsArray() {
+    GenericRow row = extractUnencoded("payload", List.of(1, "a", true));
+    assertEquals((Object[]) row.getValue("payload"), new Object[]{1, "a", 
true});
+  }
+
+  @Test
+  public void testNestedMapRecursivelyConverted() {
+    // Inner List values become Object[]; inner BigInteger widens to 
BigDecimal.
+    GenericRow row = extractUnencoded("payload", Map.of(
+        "list", List.of(1, 2),
+        "big", new BigInteger("100")
+    ));
+    Map<?, ?> result = (Map<?, ?>) row.getValue("payload");
+    assertEquals((Object[]) result.get("list"), new Object[]{1, 2});
+    assertEquals(result.get("big"), new BigDecimal("100"));
+  }
+
+  /// Run the extractor with no CLP-encoded fields configured, so `payload` 
flows through the un-encoded
+  /// path (the same `convert` dispatch as JSON).
+  private GenericRow extractUnencoded(String fieldName, Object value) {
+    CLPLogRecordExtractorConfig extractorConfig = new 
CLPLogRecordExtractorConfig();
+    extractorConfig.init(Map.of());
+    CLPLogRecordExtractor extractor = new CLPLogRecordExtractor();
+    extractor.init(null, extractorConfig, TOPIC_NAME);
+    Map<String, Object> input = new HashMap<>();
+    input.put(fieldName, value);
+    GenericRow row = new GenericRow();
+    extractor.extract(input, row);
+    return row;
+  }
+
   @Test
   public void testPreserveTopicName() {
     // Without the destination-column config, the topic is not surfaced.
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
index f24386a4bc4..c91843a4c04 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
@@ -62,8 +62,9 @@ public class CSVRecordExtractor extends 
BaseRecordExtractor<CSVRecord> {
     return to;
   }
 
+  @Nullable
   private Object convert(@Nullable String value) {
-    if (value == null || StringUtils.isEmpty(value)) {
+    if (StringUtils.isEmpty(value)) {
       return null;
       // NOTE about CSV behavior for empty string e.g. foo,bar,,zoo or 
foo,bar,"",zoo. These both are equivalent to a
       // CSVParser
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
index 4ac55d1bdba..d2fbe09da64 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.plugin.inputformat.json;
 
+import com.google.common.collect.Maps;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -27,15 +31,15 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 /// no native bytes / float / big-decimal type.
 ///
 /// **JSON source type → Java input → Java output type:**
-/// - JSON `true` / `false` → `Boolean` → `Boolean`
-/// - JSON int that fits in 32 bits → `Integer` → `Integer`
-/// - JSON int that overflows 32 bits but fits in 64 → `Long` → `Long`
-/// - JSON int that overflows 64 bits → `BigInteger` → `BigDecimal` (widened 
by [BaseRecordExtractor])
-/// - JSON decimal → `Double` → `Double` (never `Float` or `BigDecimal` with 
default Jackson config)
-/// - JSON string → `String` → `String`
-/// - JSON `null` → `null` → `null`
-/// - JSON array → `List` → `Object[]` (each element recursively converted)
-/// - JSON object → `Map` → `Map<String, Object>` (each value recursively 
converted)
+/// - `true` / `false` → `Boolean` → `Boolean`
+/// - int that fits in 32 bits → `Integer` → `Integer`
+/// - int that overflows 32 bits but fits in 64 → `Long` → `Long`
+/// - int that overflows 64 bits → `BigInteger` → `BigDecimal` (Pinot has no 
`BigInteger` data type)
+/// - decimal → `Double` → `Double` (never `Float` or `BigDecimal` with 
default Jackson config)
+/// - string → `String` → `String`
+/// - `null` → `null` → `null`
+/// - array → `List` → `Object[]` (each element recursively converted)
+/// - object → `Map` → `Map<String, Object>` (each value recursively converted)
 public class JSONRecordExtractor extends BaseRecordExtractor<Map<String, 
Object>> {
 
   @Override
@@ -43,20 +47,55 @@ public class JSONRecordExtractor extends 
BaseRecordExtractor<Map<String, Object>
     if (_extractAll) {
       for (Map.Entry<String, Object> entry : from.entrySet()) {
         Object value = entry.getValue();
-        if (value != null) {
-          value = convert(value);
-        }
-        to.putValue(entry.getKey(), value);
+        to.putValue(entry.getKey(), value != null ? convert(value) : null);
       }
     } else {
       for (String fieldName : _fields) {
         Object value = from.get(fieldName);
-        if (value != null) {
-          value = convert(value);
-        }
-        to.putValue(fieldName, value);
+        to.putValue(fieldName, value != null ? convert(value) : null);
       }
     }
     return to;
   }
+
+  /// Walks a non-null Jackson-parsed value and produces the contract shape: 
`BigDecimal` for `BigInteger`
+  /// (oversized ints), `Object[]` for JSON arrays, `Map<String, Object>` for 
JSON objects, pass-through for
+  /// the other Jackson scalar types (`Boolean`, `Integer`, `Long`, `Double`, 
`String`).
+  private static Object convert(Object value) {
+    // BigInteger widens (Pinot has no BigInteger type)
+    if (value instanceof BigInteger) {
+      return new BigDecimal((BigInteger) value);
+    }
+    // List
+    if (value instanceof List) {
+      //noinspection unchecked
+      return convertList((List<Object>) value);
+    }
+    // Map
+    if (value instanceof Map) {
+      //noinspection unchecked
+      return convertMap((Map<String, Object>) value);
+    }
+    // Single value pass-through (Boolean / Integer / Long / Double / String)
+    return value;
+  }
+
+  private static Object[] convertList(List<Object> list) {
+    int n = list.size();
+    Object[] result = new Object[n];
+    for (int i = 0; i < n; i++) {
+      Object v = list.get(i);
+      result[i] = v != null ? convert(v) : null;
+    }
+    return result;
+  }
+
+  private static Map<String, Object> convertMap(Map<String, Object> map) {
+    Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      Object v = entry.getValue();
+      result.put(entry.getKey(), v != null ? convert(v) : null);
+    }
+    return result;
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
index 1776466f30c..a67c42d6bfe 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
@@ -61,7 +61,7 @@ public class JSONRecordExtractorTest {
 
   @Test
   public void testBigIntegerWidenedToBigDecimal() {
-    // Default Jackson parses integer literals that overflow `Long` as 
`BigInteger`. The base widens to
+    // Default Jackson parses integer literals that overflow `Long` as 
`BigInteger`. The extractor widens to
     // `BigDecimal` since Pinot has no `BigInteger` type.
     BigInteger value = new BigInteger("99999999999999999999999999");
     Object result = extract(value);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
index 5a98cb5414e..5967afc875d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
@@ -44,9 +44,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 /// Extracts a single ORC row into a [GenericRow]. Input is an 
[ORCRecordExtractor.Record] handle wrapping
-/// a [VectorizedRowBatch] + schema + row index. Dispatch happens in 
[#extractValue] (complex types —
-/// `LIST` / `MAP` / `STRUCT`) and [#extractSingleValue] (primitives); ORC 
values never flow through
-/// `convertSingleValue`, so widening / `Temporal` handling is done locally 
here.
+/// a [VectorizedRowBatch] + schema + row index.
 ///
 /// **ORC schema category → Java output type:**
 /// - `BOOLEAN` → `Boolean`
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
index 309c9bb96df..48e6eddd739 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.utils.TimestampUtils;
 
 
 /// The type matrix is inherited from [AvroRecordExtractor]; the only override 
is the INT96 timestamp
-/// (which parquet-avro surfaces as `fixed(12)` with `doc = "INT96 represented 
as byte[12]"`) → [java.sql.Timestamp]
+/// (which parquet-avro surfaces as `fixed(12)` with `doc = "INT96 represented 
as byte[12]"`) → `Timestamp`
 /// (or `Long` epoch nanos when `extractRawTimeValues` is `true`) via 
[ParquetUtils#convertInt96ToEpochNanos].
 public class ParquetAvroRecordExtractor extends AvroRecordExtractor {
   private static final int INT96_BYTE_SIZE = 12;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index 7d557774f0c..3618f01d87c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -39,10 +39,9 @@ import org.apache.pinot.spi.utils.UuidUtils;
 
 
 /// Extracts Pinot [GenericRow] from Parquet [Group] objects, using Parquet 
[LogicalTypeAnnotation]s to drive
-/// LIST and MAP handling per the Parquet LogicalTypes spec 
backward-compatibility rules. Output adheres to
-/// the `RecordExtractor` contract.
+/// LIST and MAP handling per the Parquet LogicalTypes spec 
backward-compatibility rules.
 ///
-/// **Primitive types** — preserved as their boxed Java type:
+/// **Primitive types:**
 /// - `BOOLEAN` → `Boolean`
 /// - `INT32` → `Integer`
 /// - `INT64` → `Long`
@@ -51,32 +50,22 @@ import org.apache.pinot.spi.utils.UuidUtils;
 /// - `BINARY` with `STRING` / `ENUM` annotation → `String`
 /// - `BINARY` / `FIXED_LEN_BYTE_ARRAY` without annotation → `byte[]`
 ///
-/// **Logical types** — Parquet logical-type annotations decoded into uniform 
Java types regardless of the
-/// underlying physical encoding:
-/// - `DECIMAL` (on `INT32` / `INT64` / `BINARY` / `FIXED_LEN_BYTE_ARRAY`) → 
`BigDecimal`. Always converted —
-///   raw bytes aren't interpretable without external precision / scale
-/// - `INT64` + `TIMESTAMP_MILLIS` / `MICROS` / `NANOS` → `Timestamp` 
(sub-millisecond nanos preserved via
-///   `Timestamp#setNanos`), or `Long` raw value in the column's declared unit 
when raw
-/// - `INT96` → `Timestamp` (sub-millisecond nanos preserved), or `Long` epoch 
nanos when raw (nanos is
-///   INT96's natural unit since its physical encoding — nanos-of-day + Julian 
day — carries nanosecond
-///   precision)
-/// - `INT32` + `DATE` → `LocalDate`, or `Integer` raw days-since-epoch when 
raw
-/// - `INT32` + `TIME_MILLIS` → `LocalTime`, or `Integer` raw 
ms-since-midnight when raw
-/// - `INT64` + `TIME_MICROS` / `TIME_NANOS` → `LocalTime` (full nanosecond 
precision), or `Long` raw
-///   value-since-midnight in the column's declared unit when raw
-/// - `FIXED_LEN_BYTE_ARRAY(16)` + `UUID` → `java.util.UUID`. Always 
converted; the downstream type
-///   transformer adapts to the Pinot column's storage type — `STRING` → 
canonical form, `BYTES` →
-///   16-byte big-endian form
+/// **Logical types:**
+/// - `DECIMAL` (on `INT32` / `INT64` / `BINARY` / `FIXED_LEN_BYTE_ARRAY`) → 
`BigDecimal`
+/// - `INT64` + `TIMESTAMP_MILLIS` / `MICROS` / `NANOS` → `Timestamp`, or 
`Long` in the column's declared unit
+///   when `extractRawTimeValues` is `true`
+/// - `INT96` → `Timestamp`, or `Long` epoch nanos when `extractRawTimeValues` 
is `true`
+/// - `INT32` + `DATE` → `LocalDate`, or `Integer` days-since-epoch when 
`extractRawTimeValues` is `true`
+/// - `INT32` + `TIME_MILLIS` → `LocalTime`, or `Integer` ms-since-midnight 
when `extractRawTimeValues` is `true`
+/// - `INT64` + `TIME_MICROS` / `TIME_NANOS` → `LocalTime`, or `Long` 
value-since-midnight in the column's
+///   declared unit when `extractRawTimeValues` is `true`
+/// - `FIXED_LEN_BYTE_ARRAY(16)` + `UUID` → `java.util.UUID`
 ///
-/// [ParquetNativeRecordExtractorConfig#isExtractRawTimeValues] opts out of 
TIMESTAMP / DATE / TIME conversion.
-/// DECIMAL and UUID always convert.
-///
-/// **Multi-value:** `LIST`-annotated group (standard 3-level wrapper or 
legacy non-wrapper forms) → `Object[]`.
-///
-/// **Map / nested complex:** `MAP`-annotated group → `Map<Object, Object>`; 
plain non-annotated group →
-/// `Map<String, Object>` (struct).
-///
-/// **Null:** field with zero repetition count → `null`.
+/// **Complex types:**
+/// - `LIST`-annotated group (standard 3-level wrapper or legacy non-wrapper 
forms) → `Object[]`
+/// - `MAP`-annotated group → `Map<String, Object>` (keys stringified via 
[BaseRecordExtractor#stringifyMapKey])
+/// - plain non-annotated group → `Map<String, Object>` (struct)
+/// - field with zero repetition count → `null`
 public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
 
   private boolean _extractRawTimeValues;
@@ -90,10 +79,6 @@ public class ParquetNativeRecordExtractor extends 
BaseRecordExtractor<Group> {
 
   @Override
   public GenericRow extract(Group from, GenericRow to) {
-    // Parquet values bypass `convert()` — `extractValue` produces 
contract-typed values directly (Number,
-    // Boolean, byte[], String, BigDecimal, LocalDate, LocalTime, Timestamp, 
plus Object[] / Map for nested
-    // complex types via recursive extractValue). The base's universal 
normalizations (Byte/Short widening,
-    // ByteBuffer slicing, BigInteger → BigDecimal) don't apply to anything 
Parquet produces.
     GroupType fromType = from.getType();
     if (_extractAll) {
       List<Type> fields = fromType.getFields();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
index 57fed3b4894..2a4881ddca1 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
@@ -31,22 +31,20 @@ import 
org.apache.pinot.spi.data.readers.BaseRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
-/// Extracts Pinot [GenericRow] from a ProtoBuf [Message]. Conversion is 
dispatched directly off the field's
-/// [Descriptors.FieldDescriptor] (`isMapField` / `isRepeated` / 
`getJavaType`) — no [BaseRecordExtractor#convert]
-/// pipeline, no per-value wrapper allocation.
+/// Extracts Pinot [GenericRow] from a ProtoBuf [Message].
 ///
 /// **Proto source type → Java input → Java output type:**
-/// - proto `bool` → `Boolean` → `Boolean`
-/// - proto `int32` / `sint32` / `sfixed32` / `uint32` / `fixed32` → `Integer` 
→ `Integer`
-/// - proto `int64` / `sint64` / `sfixed64` / `uint64` / `fixed64` → `Long` → 
`Long`
-/// - proto `float` → `Float` → `Float`
-/// - proto `double` → `Double` → `Double`
-/// - proto `string` → `String` → `String`
-/// - proto `bytes` → [ByteString] → `byte[]` (via [ByteString#toByteArray])
-/// - proto `enum` → [Descriptors.EnumValueDescriptor] → enum constant name 
`String`
-/// - proto nested `message` → [Message] → `Map<String, Object>` (recursive 
over the message's set fields)
-/// - proto `repeated X` → `List<X>` → `Object[]` (each element recursively 
converted)
-/// - proto `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys 
stringified via
+/// - `bool` → `Boolean` → `Boolean`
+/// - `int32` / `sint32` / `sfixed32` / `uint32` / `fixed32` → `Integer` → 
`Integer`
+/// - `int64` / `sint64` / `sfixed64` / `uint64` / `fixed64` → `Long` → `Long`
+/// - `float` → `Float` → `Float`
+/// - `double` → `Double` → `Double`
+/// - `string` → `String` → `String`
+/// - `bytes` → [ByteString] → `byte[]` (via [ByteString#toByteArray])
+/// - `enum` → [Descriptors.EnumValueDescriptor] → enum constant name `String`
+/// - `message` → [Message] → `Map<String, Object>` (recursive over the 
message's set fields)
+/// - `repeated X` → `List<X>` → `Object[]` (each element recursively 
converted)
+/// - `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys stringified via
 ///   [BaseRecordExtractor#stringifyMapKey], values recursively converted)
 /// - proto3 `optional` field that is unset / cleared → `null`
 public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
index 6716d4c82dc..99da5094414 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.plugin.inputformat.thrift;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
@@ -33,19 +36,19 @@ import org.apache.thrift.meta_data.FieldMetaData;
 /// Extracts Pinot [GenericRow] from a Thrift-generated [TBase] via 
`getFieldValue(fieldForId(...))`.
 ///
 /// **Thrift source type → Java input → Java output type:**
-/// - thrift `bool` → `Boolean` → `Boolean`
-/// - thrift `i8` → `Byte` → `Integer` (widened by base)
-/// - thrift `i16` → `Short` → `Integer` (widened by base)
-/// - thrift `i32` → `Integer` → `Integer`
-/// - thrift `i64` → `Long` → `Long`
-/// - thrift `double` → `Double` → `Double`
-/// - thrift `string` → `String` → `String`
-/// - thrift `binary` → `ByteBuffer` → `byte[]`
-/// - thrift `enum` → `TEnum` → enum name `String` (via `toString()`)
-/// - thrift nested `struct` → [TBase] → `Map<String, Object>`
-/// - thrift `list<X>` / `set<X>` → `List<X>` / `Set<X>` → `Object[]`
-/// - thrift `map<K, V>` → `Map<K, V>` → `Map<Object, Object>`
-/// - thrift unset optional field → `null`
+/// - `bool` → `Boolean` → `Boolean`
+/// - `i8` → `Byte` → `Integer` (widened)
+/// - `i16` → `Short` → `Integer` (widened)
+/// - `i32` → `Integer` → `Integer`
+/// - `i64` → `Long` → `Long`
+/// - `double` → `Double` → `Double`
+/// - `string` → `String` → `String`
+/// - `binary` → `ByteBuffer` → `byte[]`
+/// - `enum` → `TEnum` → enum name `String` (via `toString()`)
+/// - `struct` → [TBase] → `Map<String, Object>`
+/// - `list<X>` / `set<X>` → `List<X>` / `Set<X>` → `Object[]`
+/// - `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys stringified via 
[BaseRecordExtractor#stringifyMapKey])
+/// - unset optional field → `null`
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class ThriftRecordExtractor extends BaseRecordExtractor<TBase> {
 
@@ -63,50 +66,98 @@ public class ThriftRecordExtractor extends 
BaseRecordExtractor<TBase> {
     if (_extractAll) {
       for (Map.Entry<String, Integer> nameToId : _fieldIds.entrySet()) {
         Object value = 
from.getFieldValue(from.fieldForId(nameToId.getValue()));
-        if (value != null) {
-          value = convert(value);
-        }
-        to.putValue(nameToId.getKey(), value);
+        to.putValue(nameToId.getKey(), value != null ? convert(value) : null);
       }
     } else {
       for (String fieldName : _fields) {
         Integer fieldId = _fieldIds.get(fieldName);
         if (fieldId != null) {
           Object value = from.getFieldValue(from.fieldForId(fieldId));
-          if (value != null) {
-            value = convert(value);
-          }
-          to.putValue(fieldName, value);
+          to.putValue(fieldName, value != null ? convert(value) : null);
         }
       }
     }
     return to;
   }
 
-  /**
-   * Returns whether the object is a Thrift object.
-   */
-  @Override
-  protected boolean isRecord(Object value) {
-    return value instanceof TBase;
+  /// Dispatches a non-null Thrift value off its runtime Java type: `Object[]` 
for `Collection` (list / set),
+  /// `Map<String, Object>` for `Map` and for nested `TBase` records, 
single-value normalization for scalars.
+  private static Object convert(Object value) {
+    // List
+    if (value instanceof Collection) {
+      return convertCollection((Collection<Object>) value);
+    }
+    // Map
+    if (value instanceof Map) {
+      return convertMap((Map<Object, Object>) value);
+    }
+    // Record
+    if (value instanceof TBase) {
+      return convertRecord((TBase) value);
+    }
+    // Single value
+    return convertSingleValue(value);
   }
 
-  /**
-   * Handles the conversion of each field of a Thrift object.
-   *
-   * @param value should be verified to be a Thrift TBase type prior to 
calling this method as it will be casted
-   *              without checking
-   */
-  @Override
-  protected Map<String, Object> convertRecord(Object value) {
-    TBase record = (TBase) value;
+  private static Object[] convertCollection(Collection<Object> collection) {
+    Object[] result = new Object[collection.size()];
+    int i = 0;
+    for (Object value : collection) {
+      result[i++] = value != null ? convert(value) : null;
+    }
+    return result;
+  }
+
+  /// Converts a `map<K, V>`. Keys flow through `convertSingleValue` (Thrift's 
allowed key types — bool /
+  /// i8 / i16 / i32 / i64 / double / string / binary — cover the same 
matrix), then are stringified via
+  /// [BaseRecordExtractor#stringifyMapKey] per the `Map<String, Object>` 
contract.
+  private static Map<String, Object> convertMap(Map<Object, Object> map) {
+    Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+    for (Map.Entry<Object, Object> entry : map.entrySet()) {
+      Object key = entry.getKey();
+      if (key == null) {
+        continue;
+      }
+      Object convertedKey = convertSingleValue(key);
+      if (convertedKey == null) {
+        continue;
+      }
+      Object value = entry.getValue();
+      result.put(stringifyMapKey(convertedKey), value != null ? convert(value) 
: null);
+    }
+    return result;
+  }
+
+  private static Map<String, Object> convertRecord(TBase record) {
     Set<TFieldIdEnum> fields = 
FieldMetaData.getStructMetaDataMap(record.getClass()).keySet();
-    Map<String, Object> convertedRecord = 
Maps.newHashMapWithExpectedSize(fields.size());
+    Map<String, Object> result = 
Maps.newHashMapWithExpectedSize(fields.size());
     for (TFieldIdEnum field : fields) {
-      Object fieldValue = record.getFieldValue(field);
-      Object convertedValue = fieldValue != null ? convert(fieldValue) : null;
-      convertedRecord.put(field.getFieldName(), convertedValue);
+      Object value = record.getFieldValue(field);
+      result.put(field.getFieldName(), value != null ? convert(value) : null);
+    }
+    return result;
+  }
+
+  /// Single-value normalization for Thrift's scalar Java types: `Byte` / 
`Short` widen to `Integer`,
+  /// `ByteBuffer` materializes to `byte[]` (slice-safely so the source 
buffer's position is not advanced),
+  /// other `Number` / `Boolean` / `byte[]` pass through, anything else (e.g. 
`TEnum`) → `toString()`.
+  @VisibleForTesting
+  static Object convertSingleValue(Object value) {
+    if (value instanceof Number) {
+      if (value instanceof Byte || value instanceof Short) {
+        return ((Number) value).intValue();
+      }
+      return value;
+    }
+    if (value instanceof Boolean || value instanceof byte[]) {
+      return value;
+    }
+    if (value instanceof ByteBuffer) {
+      ByteBuffer slice = ((ByteBuffer) value).slice();
+      byte[] bytes = new byte[slice.limit()];
+      slice.get(bytes);
+      return bytes;
     }
-    return convertedRecord;
+    return value.toString();
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
index 8a3ab442c75..81452e13925 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.inputformat.thrift;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +34,10 @@ import static org.testng.Assert.assertNull;
 
 
 /// Tests [ThriftRecordExtractor] — see its class Javadoc for the thrift 
source type → Java output type
-/// matrix. Out of scope: thrift `i8` / `binary` are not declared in 
`complex_types.thrift`; they follow the
-/// same `convert` contract — mirror the pattern below if added.
+/// matrix. Out of scope: thrift `i8` is not declared in 
`complex_types.thrift`; it follows the same
+/// `convert` contract — mirror the pattern below if added. Thrift `binary` is 
exercised against the
+/// extractor's static helper [ThriftRecordExtractor#convertSingleValue] (the 
test schemas have no `binary`
+/// field, so end-to-end coverage isn't possible).
 public class ThriftRecordExtractorTest {
 
   // === Single value — order follows the type list in the class Javadoc ===
@@ -95,7 +98,7 @@ public class ThriftRecordExtractorTest {
 
   @Test
   public void testEnumExtractedAsString() {
-    // BaseRecordExtractor.convertSingleValue falls back to toString() for 
unknown types; TestEnum.toString()
+    // The extractor's single-value path falls back to toString() for unknown 
types; TestEnum.toString()
     // returns the enum constant name.
     ComplexTypes record = baseRecord();
     record.setEnumField(TestEnum.GAMMA);
@@ -103,6 +106,24 @@ public class ThriftRecordExtractorTest {
     assertEquals(result, "GAMMA");
   }
 
+  @Test
+  public void testBinaryByteBufferExtractedAsByteArray() {
+    // Thrift surfaces `binary` as `ByteBuffer`; the extractor materializes to 
`byte[]`.
+    Object result = 
ThriftRecordExtractor.convertSingleValue(ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    assertEquals((byte[]) result, new byte[]{1, 2, 3});
+  }
+
+  @Test
+  public void testBinaryByteBufferSliceDoesNotMutateOriginal() {
+    // The extractor must read only the remaining bytes and leave the source 
buffer's position untouched
+    // (the underlying buffer is owned by the reader and may be reused).
+    ByteBuffer buf = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4});
+    buf.position(2);
+    Object result = ThriftRecordExtractor.convertSingleValue(buf);
+    assertEquals((byte[]) result, new byte[]{2, 3, 4});
+    assertEquals(buf.position(), 2);
+  }
+
   @Test
   public void testNullForUnsetOptionalField() {
     // optionalStringField is left unset; thrift returns null for unset 
optional fields.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
index 6ee53815d32..f59f51ac20c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
@@ -18,46 +18,16 @@
  */
 package org.apache.pinot.spi.data.readers;
 
-import com.google.common.collect.Maps;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.Base64;
-import java.util.Collection;
-import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ArrayUtils;
 
 
-/// Default [RecordExtractor] implementation. Subclasses override only the 
bits the format needs.
-///
-/// [#convert] dispatches by shape:
-/// - multi-value (`Collection` / non-`byte[]` array) → [#convertMultiValue] → 
`Object[]` (each element recursed)
-/// - map → [#convertMap] → `Map<String, Object>` (keys converted via 
[#convertSingleValue] then stringified via
-///   [#stringifyMapKey], each value recursed)
-/// - nested record → [#convertRecord] (throws by default; override for 
formats with nested records)
-/// - everything else → [#convertSingleValue]
-///
-/// [#convertSingleValue] applies universal normalizations for primitive types 
that any format extractor might
-/// produce:
-/// - `Byte` / `Short` widen to `Integer` so all small ints unify behind a 
single Pinot type
-/// - `BigInteger` widens to `BigDecimal` (Pinot has no `BigInteger` data 
type; downstream transforms handle
-///   `BigDecimal` natively)
-/// - other `Number` (`Integer` / `Long` / `Float` / `Double` / `BigDecimal`) 
passes through
-/// - `Boolean` passes through
-/// - `byte[]` passes through
-/// - `ByteBuffer` materializes to `byte[]` (sliced so the source buffer's 
position is not advanced)
-/// - everything else falls back to `value.toString()`
-///
-/// **Logical types (DECIMAL / TIMESTAMP / DATE / TIME / UUID) are NOT handled 
here** — see [RecordExtractor]
-/// for the contract. Format-specific extractors do the native-to-contract 
conversion themselves (e.g. the
-/// Avro extractor walks the schema in its own `extract` and never reaches 
this dispatcher).
+/// Default [RecordExtractor] base providing include-list resolution via 
[#init] and the [#stringifyMapKey] helper.
 ///
 /// @param <T> the format of the input record
-@SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
 
   /// Include-list resolved from [#init]'s `fields` argument: empty when 
[#_extractAll] is `true`, otherwise the
@@ -92,120 +62,6 @@ public abstract class BaseRecordExtractor<T> implements 
RecordExtractor<T> {
   protected void initConfig(@Nullable RecordExtractorConfig config) {
   }
 
-  /// Walks `value` through the dispatcher described on [BaseRecordExtractor] 
and returns the
-  /// contract-shaped result (single value, `Object[]`, or `Map<Object, 
Object>`). Subclasses call this
-  /// from their `extract` loop when the input is already a contract-friendly 
Java type tree
-  /// (`Collection` / `Map` / nested record). Schema-driven subclasses (Avro, 
Parquet native) bypass this
-  /// path entirely and produce contract values inline.
-  ///
-  /// The base impl never returns `null`; format-specific overrides may 
translate format-native null
-  /// sentinels by returning `null`. Java `null` inputs are short-circuited by 
callers and never reach
-  /// this method.
-  @Nullable
-  protected Object convert(Object value) {
-    Object convertedValue;
-    if (isMultiValue(value)) {
-      convertedValue = convertMultiValue(value);
-    } else if (isMap(value)) {
-      convertedValue = convertMap(value);
-    } else if (isRecord(value)) {
-      convertedValue = convertRecord(value);
-    } else {
-      convertedValue = convertSingleValue(value);
-    }
-    return convertedValue;
-  }
-
-  /// Whether `value` is multi-value. Default: `Collection` or non-`byte[]` 
array.
-  protected boolean isMultiValue(Object value) {
-    return value instanceof Collection || (value.getClass().isArray() && 
!(value instanceof byte[]));
-  }
-
-  /// Whether `value` is a map. Default: `instanceof Map`.
-  protected boolean isMap(Object value) {
-    return value instanceof Map;
-  }
-
-  /// Whether `value` is a nested record. Default `false`; override for 
formats with nested record types.
-  protected boolean isRecord(Object value) {
-    return false;
-  }
-
-  /// Converts a multi-value ([Collection] / `Object[]` / primitive array) 
into `Object[]`, recursing on each
-  /// element via [#convert]. The base impl never returns `null`; the return 
is marked nullable so format-specific
-  /// overrides retain the option to translate format-native null sentinels 
(e.g. an empty list interpreted as
-  /// `null`).
-  @Nullable
-  protected Object[] convertMultiValue(Object value) {
-    if (value instanceof Collection) {
-      return convertCollection((Collection) value);
-    }
-    if (value instanceof Object[]) {
-      return convertArray((Object[]) value);
-    }
-    return convertPrimitiveArray(value);
-  }
-
-  protected Object[] convertCollection(Collection collection) {
-    Object[] convertedValues = new Object[collection.size()];
-    int index = 0;
-    for (Object value : collection) {
-      convertedValues[index++] = value != null ? convert(value) : null;
-    }
-    return convertedValues;
-  }
-
-  protected Object[] convertArray(Object[] array) {
-    int numValues = array.length;
-    Object[] convertedValues = new Object[numValues];
-    for (int i = 0; i < numValues; i++) {
-      Object value = array[i];
-      convertedValues[i] = value != null ? convert(value) : null;
-    }
-    return convertedValues;
-  }
-
-  protected Object[] convertPrimitiveArray(Object array) {
-    if (array instanceof int[]) {
-      return ArrayUtils.toObject((int[]) array);
-    }
-    if (array instanceof long[]) {
-      return ArrayUtils.toObject((long[]) array);
-    }
-    if (array instanceof float[]) {
-      return ArrayUtils.toObject((float[]) array);
-    }
-    if (array instanceof double[]) {
-      return ArrayUtils.toObject((double[]) array);
-    }
-    throw new IllegalArgumentException("Unsupported primitive array type: " + 
array.getClass().getName());
-  }
-
-  /// Converts a map, recursing on each value via [#convert]. Keys go through 
[#convertSingleValue] (so format-native
-  /// raw types like `ByteBuffer` / `Byte` / `BigInteger` reach the contract 
form expected by [#stringifyMapKey])
-  /// and are then stringified per the `Map<String, Object>` contract. Entries 
with a `null` key — either input or
-  /// post-conversion (a format-specific override may translate the key to 
`null` for a format-native null sentinel)
-  /// — are dropped. The base impl never returns `null`; the return is marked 
nullable so format-specific overrides
-  /// retain the option to translate format-native null sentinels.
-  @Nullable
-  protected Map<String, Object> convertMap(Object value) {
-    Map<Object, Object> map = (Map) value;
-    Map<String, Object> convertedMap = 
Maps.newHashMapWithExpectedSize(map.size());
-    for (Map.Entry<Object, Object> entry : map.entrySet()) {
-      Object mapKey = entry.getKey();
-      if (mapKey == null) {
-        continue;
-      }
-      Object convertedKey = convertSingleValue(mapKey);
-      if (convertedKey == null) {
-        continue;
-      }
-      Object mapValue = entry.getValue();
-      convertedMap.put(stringifyMapKey(convertedKey), mapValue != null ? 
convert(mapValue) : null);
-    }
-    return convertedMap;
-  }
-
   /// Stringifies a map key per the `RecordExtractor` `Map<String, Object>` 
contract. Single source of truth
   /// for map-key stringification across every format extractor:
   /// - `byte[]` → base64 (matches Jackson's `byte[]` value serialization, so 
a serialized map reads
@@ -226,41 +82,4 @@ public abstract class BaseRecordExtractor<T> implements 
RecordExtractor<T> {
     }
     return key.toString();
   }
-
-  /// Converts a nested record into a `Map<String, Object>`. Default throws — 
override in formats that have nested
-  /// records (Avro `GenericRecord`, Thrift `TBase`, ProtoBuf nested 
`Message`). Marked nullable so format-specific
-  /// overrides retain the option to translate format-native null sentinels.
-  @Nullable
-  protected Map<String, Object> convertRecord(Object value) {
-    throw new UnsupportedOperationException(
-        getClass().getSimpleName() + " does not support nested records; 
override convertRecord() to enable.");
-  }
-
-  /// Applies the universal primitive normalizations documented on 
[BaseRecordExtractor]. Format-specific extractors
-  /// override this to add native-type handling (e.g. Avro `Instant` → 
`Timestamp`, ProtoBuf `EnumValueDescriptor`
-  /// → `String`). The base impl never returns `null`; the return is marked 
nullable so format-specific overrides
-  /// retain the option to translate format-native null sentinels.
-  @Nullable
-  protected Object convertSingleValue(Object value) {
-    if (value instanceof Number) {
-      if (value instanceof Byte || value instanceof Short) {
-        return ((Number) value).intValue();
-      }
-      if (value instanceof BigInteger) {
-        return new BigDecimal((BigInteger) value);
-      }
-      return value;
-    }
-    if (value instanceof Boolean || value instanceof byte[]) {
-      return value;
-    }
-    if (value instanceof ByteBuffer) {
-      // ByteBuffer might be reused by the reader. Slice to avoid advancing 
the original buffer's position.
-      ByteBuffer slice = ((ByteBuffer) value).slice();
-      byte[] bytesValue = new byte[slice.limit()];
-      slice.get(bytesValue);
-      return bytesValue;
-    }
-    return value.toString();
-  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
index 5d0076b232c..4396117c4e7 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
@@ -19,362 +19,77 @@
 package org.apache.pinot.spi.data.readers;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Set;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
 
 
-/// Tests what [BaseRecordExtractor] still owns after format-specific 
extractors took over their own type
-/// conversions:
-/// - **Universal normalizations**: `Byte` / `Short` → `Integer`, [BigInteger] 
→ [BigDecimal], `ByteBuffer` →
-///   `byte[]` (with slice-safety guarantee); passthrough for other `Number` / 
`Boolean` / `byte[]` / `String`.
-/// - **Recursive walker** for nested complex inputs that format extractors 
with `Collection` / `Map` / nested
-///   record shapes (Avro, Thrift, JSON) lean on:
-///   - `Collection` / `Object[]` / primitive `int[]` / `long[]` / `float[]` / 
`double[]` → `Object[]`;
-///     unsupported primitive arrays (`boolean[]` / `short[]` / `char[]`) 
throw.
-///   - `Map`: keys flow through `convertSingleValue`; values recursively 
converted.
-///   - Nested record: default `convertRecord` throws unless the subclass 
overrides it.
-/// - **Fallback**: any other type → `value.toString()`.
-///
-/// Date / time types are NOT exercised here — DATE / TIME passthrough and 
TIMESTAMP narrowing are per-format
-/// (see e.g. `AvroRecordExtractorTest`).
 public class BaseRecordExtractorTest {
 
-  // === Single-value — order follows the type list in the class Javadoc ===
+  // === init / include-list resolution ===
 
   @Test
-  void testBooleanPreserved() {
-    Object trueResult = extract(true);
-    assertEquals(trueResult, true);
-    Object falseResult = extract(false);
-    assertEquals(falseResult, false);
+  public void testInitNullFieldsExtractsAll() {
+    NoOpExtractor extractor = new NoOpExtractor();
+    extractor.init(null, null);
+    assertTrue(extractor._extractAll);
+    assertEquals(extractor._fields, Set.of());
   }
 
   @Test
-  void testByteWidenedToInteger() {
-    Object result = extract((byte) 42);
-    assertEquals(result, 42);
+  public void testInitEmptyFieldsExtractsAll() {
+    NoOpExtractor extractor = new NoOpExtractor();
+    extractor.init(Set.of(), null);
+    assertTrue(extractor._extractAll);
+    assertEquals(extractor._fields, Set.of());
   }
 
   @Test
-  void testShortWidenedToInteger() {
-    Object result = extract((short) 42);
-    assertEquals(result, 42);
-  }
-
-  @Test
-  void testIntegerPreserved() {
-    Object result = extract(42);
-    assertEquals(result, 42);
-  }
-
-  @Test
-  void testLongPreserved() {
-    Object result = extract(42L);
-    assertEquals(result, 42L);
-  }
-
-  @Test
-  void testFloatPreserved() {
-    Object result = extract(1.5f);
-    assertEquals(result, 1.5f);
-  }
-
-  @Test
-  void testDoublePreserved() {
-    Object result = extract(1.5d);
-    assertEquals(result, 1.5d);
-  }
-
-  @Test
-  void testBigDecimalPreserved() {
-    Object result = extract(new BigDecimal("123.45"));
-    assertEquals(result, new BigDecimal("123.45"));
-  }
-
-  @Test
-  void testBigIntegerWidenedToBigDecimal() {
-    // Pinot has no `BigInteger` type; the base widens to `BigDecimal` so 
downstream transforms can handle it.
-    Object result = extract(new BigInteger("12345678901234567890123456789"));
-    assertEquals(result, new BigDecimal("12345678901234567890123456789"));
-  }
-
-  @Test
-  void testStringPreserved() {
-    Object result = extract("hello");
-    assertEquals(result, "hello");
-  }
-
-  // === byte[] / ByteBuffer ===
-
-  @Test
-  void testByteArrayPreserved() {
-    Object result = extract(new byte[]{1, 2, 3});
-    assertEquals((byte[]) result, new byte[]{1, 2, 3});
-  }
-
-  @Test
-  void testByteBufferConvertedToByteArray() {
-    Object result = extract(ByteBuffer.wrap(new byte[]{1, 2, 3}));
-    assertEquals((byte[]) result, new byte[]{1, 2, 3});
-  }
-
-  @Test
-  void testByteBufferSliceDoesNotMutateOriginal() {
-    // The extractor must read only the remaining bytes and leave the source 
buffer's position untouched.
-    ByteBuffer buf = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4});
-    buf.position(2);
-    Object result = extract(buf);
-    assertEquals((byte[]) result, new byte[]{2, 3, 4});
-    assertEquals(buf.position(), 2);
-  }
-
-
-  // === Fallback / null ===
-
-  @Test
-  void testNonStandardObjectFallsBackToToString() {
-    Object result = extract(new StringBuilder("hello"));
-    assertEquals(result, "hello");
-  }
-
-  // === Multi-value (Collection / Object[] / primitive array) → Object[] ===
-
-  @Test
-  void testListExtractedAsArray() {
-    Object[] result = (Object[]) extract(List.of(1, 2, 3));
-    assertEquals(result, new Object[]{1, 2, 3});
-  }
-
-  @Test
-  void testObjectArrayExtractedAsArray() {
-    Object[] result = (Object[]) extract(new Object[]{1, "a", true});
-    assertEquals(result, new Object[]{1, "a", true});
-  }
-
-  @Test
-  void testIntPrimitiveArrayExtractedAsArray() {
-    Object[] result = (Object[]) extract(new int[]{10, 20, 30});
-    assertEquals(result, new Object[]{10, 20, 30});
-  }
-
-  @Test
-  void testLongPrimitiveArrayExtractedAsArray() {
-    Object[] result = (Object[]) extract(new long[]{10L, 20L, 30L});
-    assertEquals(result, new Object[]{10L, 20L, 30L});
-  }
-
-  @Test
-  void testFloatPrimitiveArrayExtractedAsArray() {
-    Object[] result = (Object[]) extract(new float[]{1.5f, 2.5f});
-    assertEquals(result, new Object[]{1.5f, 2.5f});
-  }
-
-  @Test
-  void testDoublePrimitiveArrayExtractedAsArray() {
-    Object[] result = (Object[]) extract(new double[]{1.5d, 2.5d});
-    assertEquals(result, new Object[]{1.5d, 2.5d});
-  }
-
-  @Test
-  void testUnsupportedPrimitiveArrayThrows() {
-    expectThrows(IllegalArgumentException.class, () -> extract(new char[]{'a', 
'b'}));
-  }
-
-  @Test
-  void testListPreservesNullElements() {
-    // Shape preservation: null elements stay null in `Object[]`.
-    Object[] result = (Object[]) extract(Arrays.asList(1, null, 3));
-    assertEquals(result, new Object[]{1, null, 3});
-  }
-
-  @Test
-  void testObjectArrayPreservesNullElements() {
-    Object[] result = (Object[]) extract(new Object[]{1, null, 3});
-    assertEquals(result, new Object[]{1, null, 3});
-  }
-
-  @Test
-  void testEmptyListExtractedAsEmptyArray() {
-    Object[] result = (Object[]) extract(List.of());
-    assertEquals(result, new Object[]{});
-  }
-
-  @Test
-  void testEmptyObjectArrayExtractedAsEmptyArray() {
-    Object[] result = (Object[]) extract(new Object[]{});
-    assertEquals(result, new Object[]{});
-  }
-
-  // === Map (recursively converted) ===
-
-  @Test
-  void testMapWithStringKeys() {
-    Map<?, ?> result = (Map<?, ?>) extract(Map.of(
-        "k1", 1,
-        "k2", "foo"
-    ));
-    assertEquals(result.size(), 2);
-    assertEquals(result.get("k1"), 1);
-    assertEquals(result.get("k2"), "foo");
-  }
-
-  @Test
-  void testMapValuesRecursivelyConverted() {
-    // Inner List values become Object[]; inner Short values widen to Integer.
-    Map<?, ?> result = (Map<?, ?>) extract(Map.of(
-        "list", List.of(1, 2),
-        "short", (short) 7
-    ));
-    assertEquals((Object[]) result.get("list"), new Object[]{1, 2});
-    assertEquals(result.get("short"), 7);
-  }
-
-  @Test
-  void testMapKeysRunThroughConvertSingleValue() {
-    // Keys flow through `convertSingleValue` before `stringifyMapKey`. 
ByteBuffer materializes to byte[] then
-    // base64-encodes; Byte/Short/BigInteger widen before `toString()`.
-    HashMap<Object, Object> input = new HashMap<>();
-    input.put(ByteBuffer.wrap(new byte[]{1, 2, 3}), "bytes");
-    input.put((byte) 7, "byte");
-    input.put((short) 8, "short");
-    input.put(new BigInteger("12345678901234567890"), "bigint");
-    Map<?, ?> result = (Map<?, ?>) extract(input);
-    assertEquals(result, Map.of(
-        "AQID", "bytes",
-        "7", "byte",
-        "8", "short",
-        "12345678901234567890", "bigint"
-    ));
-  }
-
-  @Test
-  void testMapPreservesNullValues() {
-    // Shape preservation: null values stay null (only null *keys* drop the 
entry).
-    HashMap<String, Object> input = new HashMap<>();
-    input.put("k1", 1);
-    input.put("k2", null);
-    Map<?, ?> result = (Map<?, ?>) extract(input);
-    assertEquals(result.size(), 2);
-    assertEquals(result.get("k1"), 1);
-    assertNull(result.get("k2"));
-    assertTrue(result.containsKey("k2"));
-  }
-
-  @Test
-  void testMapDropsNullKeyEntry() {
-    HashMap<Object, Object> input = new HashMap<>();
-    input.put("k1", 1);
-    input.put(null, "ignored");
-    Map<?, ?> result = (Map<?, ?>) extract(input);
-    assertEquals(result, Map.of("k1", 1));
-  }
-
-  @Test
-  void testMapDropsEntryWhenConvertSingleValueReturnsNull() {
-    // Format-specific overrides may translate a non-null Java key to `null` 
(a format-native null sentinel).
-    // The entry must be dropped — converted-key check, not just input-key 
check.
-    Map<?, ?> result = (Map<?, ?>) new SentinelKeyExtractor().convert(Map.of(
-        "k1", 1,
-        "__SENTINEL__", 2
-    ));
-    assertEquals(result, Map.of("k1", 1));
-  }
-
-  @Test
-  void testEmptyMapExtractedAsEmptyMap() {
-    Map<?, ?> result = (Map<?, ?>) extract(Map.of());
-    assertEquals(result.size(), 0);
+  public void testInitNonEmptyFieldsRestrictsToIncludeList() {
+    NoOpExtractor extractor = new NoOpExtractor();
+    extractor.init(Set.of("a", "b"), null);
+    assertFalse(extractor._extractAll);
+    assertEquals(extractor._fields, Set.of("a", "b"));
   }
 
   // === stringifyMapKey — shared helper for the Map<String, Object> contract 
===
 
   @Test
-  void testStringifyMapKeyByteArrayBase64Encoded() {
+  public void testStringifyMapKeyByteArrayBase64Encoded() {
     assertEquals(BaseRecordExtractor.stringifyMapKey(new byte[]{0, 1, 2, 3}), 
"AAECAw==");
   }
 
   @Test
-  void testStringifyMapKeyTimestampUsesIsoUtcWithNanos() {
-    java.sql.Timestamp ts = new java.sql.Timestamp(1700000000123L);
+  public void testStringifyMapKeyTimestampUsesIsoUtcWithNanos() {
+    Timestamp ts = new Timestamp(1700000000123L);
     ts.setNanos(123_456_789);
     // ISO-8601 UTC, JVM-TZ-stable, full nanosecond precision preserved.
     assertEquals(BaseRecordExtractor.stringifyMapKey(ts), 
"2023-11-14T22:13:20.123456789Z");
   }
 
   @Test
-  void testStringifyMapKeyOtherTypesUseToString() {
+  public void testStringifyMapKeyOtherTypesUseToString() {
     assertEquals(BaseRecordExtractor.stringifyMapKey("k"), "k");
     assertEquals(BaseRecordExtractor.stringifyMapKey(42), "42");
     assertEquals(BaseRecordExtractor.stringifyMapKey(123L), "123");
     assertEquals(BaseRecordExtractor.stringifyMapKey(true), "true");
     assertEquals(BaseRecordExtractor.stringifyMapKey(new BigDecimal("1.50")), 
"1.50");
-    
assertEquals(BaseRecordExtractor.stringifyMapKey(java.time.LocalDate.of(2024, 
1, 1)), "2024-01-01");
-    assertEquals(BaseRecordExtractor.stringifyMapKey(java.time.LocalTime.of(8, 
51, 32)), "08:51:32");
+    assertEquals(BaseRecordExtractor.stringifyMapKey(LocalDate.of(2024, 1, 
1)), "2024-01-01");
+    assertEquals(BaseRecordExtractor.stringifyMapKey(LocalTime.of(8, 51, 32)), 
"08:51:32");
   }
 
-  // === Nested record — base throws unless overridden ===
-
-  @Test
-  void testNestedRecordThrowsByDefault() {
-    UnsupportedOperationException ex = 
expectThrows(UnsupportedOperationException.class,
-        () -> new RecordCapableExtractor().convert(new Object()));
-    assertEquals(ex.getMessage(),
-        "RecordCapableExtractor does not support nested records; override 
convertRecord() to enable.");
-  }
+  // === Helper ===
 
-  // === Helpers ===
-
-  private static Object extract(Object value) {
-    return new TestExtractor().convert(value);
-  }
-
-  /// Minimal subclass that doesn't override anything — exercises the default 
conversion paths in
-  /// [BaseRecordExtractor].
-  private static final class TestExtractor extends BaseRecordExtractor<Object> 
{
+  private static final class NoOpExtractor extends BaseRecordExtractor<Object> 
{
     @Override
     public GenericRow extract(Object from, GenericRow to) {
       throw new UnsupportedOperationException();
     }
   }
-
-  /// Subclass that returns `true` from [#isRecord] for any input but does NOT 
override `convertRecord` —
-  /// triggers the default `UnsupportedOperationException`.
-  private static final class RecordCapableExtractor extends 
BaseRecordExtractor<Object> {
-    @Override
-    public GenericRow extract(Object from, GenericRow to) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean isRecord(Object value) {
-      return true;
-    }
-  }
-
-  /// Subclass that returns `null` from [#convertSingleValue] for the 
`"__SENTINEL__"` key — simulates a format's
-  /// null-sentinel translation. Used to verify that `convertMap` drops 
entries whose post-conversion key is
-  /// `null`, not just entries whose input key is `null`.
-  private static final class SentinelKeyExtractor extends 
BaseRecordExtractor<Object> {
-    @Override
-    public GenericRow extract(Object from, GenericRow to) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Nullable
-    @Override
-    protected Object convertSingleValue(Object value) {
-      return "__SENTINEL__".equals(value) ? null : 
super.convertSingleValue(value);
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to