This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b292c1ac4c Make record extractors self-contained; drop
BaseRecordExtractor.convert dispatcher (#18436)
7b292c1ac4c is described below
commit 7b292c1ac4cf8d1c3e7bfe7d4fc01a8631d0b31f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu May 7 11:30:24 2026 -0700
Make record extractors self-contained; drop BaseRecordExtractor.convert
dispatcher (#18436)
---
.../inputformat/avro/AvroRecordExtractor.java | 53 ++--
.../inputformat/clplog/CLPLogRecordExtractor.java | 86 +++---
.../clplog/CLPLogRecordExtractorTest.java | 47 +++
.../plugin/inputformat/csv/CSVRecordExtractor.java | 3 +-
.../inputformat/json/JSONRecordExtractor.java | 73 ++++-
.../inputformat/json/JSONRecordExtractorTest.java | 2 +-
.../plugin/inputformat/orc/ORCRecordExtractor.java | 4 +-
.../parquet/ParquetAvroRecordExtractor.java | 2 +-
.../parquet/ParquetNativeRecordExtractor.java | 49 +--
.../protobuf/ProtoBufRecordExtractor.java | 26 +-
.../inputformat/thrift/ThriftRecordExtractor.java | 133 +++++---
.../thrift/ThriftRecordExtractorTest.java | 27 +-
.../spi/data/readers/BaseRecordExtractor.java | 183 +----------
.../spi/data/readers/BaseRecordExtractorTest.java | 343 ++-------------------
14 files changed, 357 insertions(+), 674 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
index 43c086bd8f6..f27f4246d40 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java
@@ -44,39 +44,32 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-/// Extracts Pinot [GenericRow] from an Avro [GenericRecord]. A single
schema-driven walk produces values
-/// that satisfy the `RecordExtractor` contract directly — no second pass
through `BaseRecordExtractor#convert`.
+/// Extracts Pinot [GenericRow] from an Avro [GenericRecord].
///
/// **Avro source type → Java input → Java output type:**
-/// - avro `boolean` → `Boolean` → `Boolean`
-/// - avro `int` → `Integer` → `Integer`
-/// - avro `long` → `Long` → `Long`
-/// - avro `float` → `Float` → `Float`
-/// - avro `double` → `Double` → `Double`
-/// - avro `string` → `Utf8` → `String` (via `Utf8.toString()`)
-/// - avro `bytes` → `ByteBuffer` → `byte[]` (materialized from the buffer's
remaining content)
-/// - avro `fixed` → `GenericFixed` / `GenericData.Fixed` → `byte[]`
-/// - avro `enum` → `GenericData.EnumSymbol` → `String` (enum name)
-/// - avro `array<T>` → `List<T>` → `Object[]` (each element recursively
converted)
-/// - avro `map<string, T>` → `Map<Utf8, T>` → `Map<String, Object>` (each
value recursively converted)
-/// - avro nested `record` → `GenericRecord` → `Map<String, Object>`
-/// - avro `union[null, X]` with the `null` branch selected → `null`
+/// - `boolean` → `Boolean` → `Boolean`
+/// - `int` → `Integer` → `Integer`
+/// - `long` → `Long` → `Long`
+/// - `float` → `Float` → `Float`
+/// - `double` → `Double` → `Double`
+/// - `string` → `Utf8` → `String` (via `Utf8.toString()`)
+/// - `bytes` → `ByteBuffer` → `byte[]` (materialized from the buffer's
remaining content)
+/// - `fixed` → `GenericFixed` / `GenericData.Fixed` → `byte[]`
+/// - `enum` → `GenericData.EnumSymbol` → `String` (enum name)
+/// - `array<T>` → `List<T>` → `Object[]` (each element recursively converted)
+/// - `map<string, T>` → `Map<Utf8, T>` → `Map<String, Object>` (each value
recursively converted)
+/// - `record` → `GenericRecord` → `Map<String, Object>`
+/// - `union[null, X]` with the `null` branch selected → `null`
///
-/// **Logical types** (the reader emits the raw Avro physical type; this
extractor applies the conversion via
-/// [#CONVERSION_MAP] to produce the Pinot contract type):
-/// - `decimal` / `big-decimal` → raw `ByteBuffer` → [BigDecimal] (always
converted; raw bytes aren't interpretable
-/// without external precision/scale)
-/// - `timestamp-millis` → raw `Long` → [Timestamp], or `Long` raw epoch
millis when `extractRawTimeValues` is `true`
-/// - `timestamp-micros` → raw `Long` → [Timestamp] (sub-millisecond micros
preserved), or `Long` raw epoch micros when
-/// `extractRawTimeValues` is `true`
-/// - `timestamp-nanos` → raw `Long` → [Timestamp] (nanosecond precision
preserved), or `Long` raw epoch nanos when
-/// `extractRawTimeValues` is `true`
-/// - `date` → raw `Integer` → [LocalDate], or `Integer` raw days-since-epoch
when `extractRawTimeValues` is `true`
-/// - `time-millis` → raw `Integer` → [LocalTime], or `Integer` raw
ms-since-midnight when `extractRawTimeValues` is
-/// `true`
-/// - `time-micros` → raw `Long` → [LocalTime], or `Long` raw
µs-since-midnight when `extractRawTimeValues` is `true`
-/// - `uuid` → raw `Utf8` / `GenericFixed` → [UUID] (always converted;
downstream type transformer adapts to the Pinot
-/// column type — `STRING` column gets canonical UUID string, `BYTES` column
gets 16-byte big-endian form)
+/// **Logical types:**
+/// - `decimal` / `big-decimal` → `ByteBuffer` → [BigDecimal]
+/// - `timestamp-millis` → `Long` → [Timestamp], or `Long` epoch millis when
`extractRawTimeValues` is `true`
+/// - `timestamp-micros` → `Long` → [Timestamp], or `Long` epoch micros when
`extractRawTimeValues` is `true`
+/// - `timestamp-nanos` → `Long` → [Timestamp], or `Long` epoch nanos when
`extractRawTimeValues` is `true`
+/// - `date` → `Integer` → [LocalDate], or `Integer` days-since-epoch when
`extractRawTimeValues` is `true`
+/// - `time-millis` → `Integer` → [LocalTime], or `Integer` ms-since-midnight
when `extractRawTimeValues` is `true`
+/// - `time-micros` → `Long` → [LocalTime], or `Long` µs-since-midnight when
`extractRawTimeValues` is `true`
+/// - `uuid` → `Utf8` / `GenericFixed` → [UUID]
public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> {
private static final Conversion<BigDecimal> DECIMAL_CONVERSION = new
Conversions.DecimalConversion();
private static final Conversion<BigDecimal> BIG_DECIMAL_CONVERSION = new
Conversions.BigDecimalConversion();
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index aebefbfe66f..06885200d34 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -20,11 +20,15 @@ package org.apache.pinot.plugin.inputformat.clplog;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
import com.yscope.clp.compressorfrontend.EncodedMessage;
import com.yscope.clp.compressorfrontend.MessageEncoder;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -41,33 +45,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * A record extractor for log events. For configuration options, see {@link
CLPLogRecordExtractorConfig}. This is an
- * experimental feature.
- * <p></p>
- * The goal of this record extractor is to allow us to encode the fields
specified in
- * {@link CLPLogRecordExtractorConfig} using CLP. CLP is a compressor designed
to encode unstructured log messages in a
- * way that makes them more compressible. It does this by decomposing a
message into three fields:
- * <ul>
- * <li>the message's static text, called a log type;</li>
- * <li>repetitive variable values, called dictionary variables; and</li>
- * <li>non-repetitive variable values (called encoded variables since we
encode them specially if possible).</li>
- * </ul>
- * For instance, if the field "message" is encoded, then the extractor will
output:
- * <ul>
- * <li>message_logtype</li>
- * <li>message_dictionaryVars</li>
- * <li>message_encodedVars</li>
- * </ul>
- * All remaining fields are processed in the same way as they are in
- * {@link org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor}.
Specifically:
- * <ul>
- * <li>If the caller passed a set of fields to {@code init}, then only those
fields are extracted from each
- * record and any remaining fields are dropped.</li>
- * <li>Otherwise, all fields are extracted from each record.</li>
- * </ul>
- * This class' implementation is based on {@link
org.apache.pinot.plugin.inputformat.json.JSONRecordExtractor}.
- */
+/// Record extractor for log events. Experimental. For configuration options
see [CLPLogRecordExtractorConfig].
+///
+/// Each field configured for CLP encoding is split into three sibling fields
— `_logtype`,
+/// `_dictionaryVars`, `_encodedVars`. All other fields are extracted as plain
JSON values.
public class CLPLogRecordExtractor extends BaseRecordExtractor<Map<String,
Object>> {
// The maximum number of variables that can be stored in a cell (row of a
single column).
private static final int MAX_VARIABLES_PER_CELL =
ForwardIndexType.MAX_MULTI_VALUES_PER_ROW;
@@ -139,10 +120,7 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
if (clpEncodedFieldNames.contains(key)) {
encodeFieldWithClp(key, value, to);
} else {
- if (value != null) {
- value = convert(value);
- }
- to.putValue(key, value);
+ to.putValue(key, value != null ? convert(value) : null);
}
}
return to;
@@ -151,10 +129,7 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
// Handle un-encoded fields
for (String fieldName : _fields) {
Object value = from.get(fieldName);
- if (value != null) {
- value = convert(value);
- }
- to.putValue(fieldName, value);
+ to.putValue(fieldName, value != null ? convert(value) : null);
}
// Handle encoded fields
@@ -165,6 +140,47 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
return to;
}
+ /// Walks a non-null Jackson-parsed value and produces the contract shape:
`BigDecimal` for `BigInteger`
+ /// (oversized ints), `Object[]` for JSON arrays, `Map<String, Object>` for
JSON objects, pass-through for
+ /// the other Jackson scalar types.
+ private static Object convert(Object value) {
+ // BigInteger widens (Pinot has no BigInteger type)
+ if (value instanceof BigInteger) {
+ return new BigDecimal((BigInteger) value);
+ }
+ // List
+ if (value instanceof List) {
+ //noinspection unchecked
+ return convertList((List<Object>) value);
+ }
+ // Map
+ if (value instanceof Map) {
+ //noinspection unchecked
+ return convertMap((Map<String, Object>) value);
+ }
+ // Single value pass-through (Boolean / Integer / Long / Double / String)
+ return value;
+ }
+
+ private static Object[] convertList(List<Object> list) {
+ int n = list.size();
+ Object[] result = new Object[n];
+ for (int i = 0; i < n; i++) {
+ Object v = list.get(i);
+ result[i] = v != null ? convert(v) : null;
+ }
+ return result;
+ }
+
+ private static Map<String, Object> convertMap(Map<String, Object> map) {
+ Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ Object v = entry.getValue();
+ result.put(entry.getKey(), v != null ? convert(v) : null);
+ }
+ return result;
+ }
+
/**
* Encodes a field with CLP
* <p></p>
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 055481f7282..559cfa3bcd4 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -21,7 +21,11 @@ package org.apache.pinot.plugin.inputformat.clplog;
import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
import com.yscope.clp.compressorfrontend.MessageDecoder;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
@@ -88,6 +92,49 @@ public class CLPLogRecordExtractorTest {
assertNull(row.getValue(MESSAGE_1_FIELD_NAME +
ClpRewriter.LOGTYPE_COLUMN_SUFFIX));
}
+ // === Un-encoded field conversion (same dispatch as JSONRecordExtractor) ===
+
+ @Test
+ public void testBigIntegerWidenedToBigDecimal() {
+ // Default Jackson parses integer literals that overflow `Long` as
`BigInteger`. The extractor widens
+ // to `BigDecimal` since Pinot has no `BigInteger` type.
+ BigInteger value = new BigInteger("99999999999999999999999999");
+ GenericRow row = extractUnencoded("payload", value);
+ assertEquals(row.getValue("payload"), new BigDecimal(value));
+ }
+
+ @Test
+ public void testListExtractedAsArray() {
+ GenericRow row = extractUnencoded("payload", List.of(1, "a", true));
+ assertEquals((Object[]) row.getValue("payload"), new Object[]{1, "a",
true});
+ }
+
+ @Test
+ public void testNestedMapRecursivelyConverted() {
+ // Inner List values become Object[]; inner BigInteger widens to
BigDecimal.
+ GenericRow row = extractUnencoded("payload", Map.of(
+ "list", List.of(1, 2),
+ "big", new BigInteger("100")
+ ));
+ Map<?, ?> result = (Map<?, ?>) row.getValue("payload");
+ assertEquals((Object[]) result.get("list"), new Object[]{1, 2});
+ assertEquals(result.get("big"), new BigDecimal("100"));
+ }
+
+ /// Run the extractor with no CLP-encoded fields configured, so `payload`
flows through the un-encoded
+ /// path (the same `convert` dispatch as JSON).
+ private GenericRow extractUnencoded(String fieldName, Object value) {
+ CLPLogRecordExtractorConfig extractorConfig = new
CLPLogRecordExtractorConfig();
+ extractorConfig.init(Map.of());
+ CLPLogRecordExtractor extractor = new CLPLogRecordExtractor();
+ extractor.init(null, extractorConfig, TOPIC_NAME);
+ Map<String, Object> input = new HashMap<>();
+ input.put(fieldName, value);
+ GenericRow row = new GenericRow();
+ extractor.extract(input, row);
+ return row;
+ }
+
@Test
public void testPreserveTopicName() {
// Without the destination-column config, the topic is not surfaced.
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
index f24386a4bc4..c91843a4c04 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractor.java
@@ -62,8 +62,9 @@ public class CSVRecordExtractor extends
BaseRecordExtractor<CSVRecord> {
return to;
}
+ @Nullable
private Object convert(@Nullable String value) {
- if (value == null || StringUtils.isEmpty(value)) {
+ if (StringUtils.isEmpty(value)) {
return null;
// NOTE about CSV behavior for empty string e.g. foo,bar,,zoo or
foo,bar,"",zoo. These both are equivalent to a
// CSVParser
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
index 4ac55d1bdba..d2fbe09da64 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractor.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.plugin.inputformat.json;
+import com.google.common.collect.Maps;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -27,15 +31,15 @@ import org.apache.pinot.spi.data.readers.GenericRow;
/// no native bytes / float / big-decimal type.
///
/// **JSON source type → Java input → Java output type:**
-/// - JSON `true` / `false` → `Boolean` → `Boolean`
-/// - JSON int that fits in 32 bits → `Integer` → `Integer`
-/// - JSON int that overflows 32 bits but fits in 64 → `Long` → `Long`
-/// - JSON int that overflows 64 bits → `BigInteger` → `BigDecimal` (widened
by [BaseRecordExtractor])
-/// - JSON decimal → `Double` → `Double` (never `Float` or `BigDecimal` with
default Jackson config)
-/// - JSON string → `String` → `String`
-/// - JSON `null` → `null` → `null`
-/// - JSON array → `List` → `Object[]` (each element recursively converted)
-/// - JSON object → `Map` → `Map<String, Object>` (each value recursively
converted)
+/// - `true` / `false` → `Boolean` → `Boolean`
+/// - int that fits in 32 bits → `Integer` → `Integer`
+/// - int that overflows 32 bits but fits in 64 → `Long` → `Long`
+/// - int that overflows 64 bits → `BigInteger` → `BigDecimal` (Pinot has no
`BigInteger` data type)
+/// - decimal → `Double` → `Double` (never `Float` or `BigDecimal` with
default Jackson config)
+/// - string → `String` → `String`
+/// - `null` → `null` → `null`
+/// - array → `List` → `Object[]` (each element recursively converted)
+/// - object → `Map` → `Map<String, Object>` (each value recursively converted)
public class JSONRecordExtractor extends BaseRecordExtractor<Map<String,
Object>> {
@Override
@@ -43,20 +47,55 @@ public class JSONRecordExtractor extends
BaseRecordExtractor<Map<String, Object>
if (_extractAll) {
for (Map.Entry<String, Object> entry : from.entrySet()) {
Object value = entry.getValue();
- if (value != null) {
- value = convert(value);
- }
- to.putValue(entry.getKey(), value);
+ to.putValue(entry.getKey(), value != null ? convert(value) : null);
}
} else {
for (String fieldName : _fields) {
Object value = from.get(fieldName);
- if (value != null) {
- value = convert(value);
- }
- to.putValue(fieldName, value);
+ to.putValue(fieldName, value != null ? convert(value) : null);
}
}
return to;
}
+
+ /// Walks a non-null Jackson-parsed value and produces the contract shape:
`BigDecimal` for `BigInteger`
+ /// (oversized ints), `Object[]` for JSON arrays, `Map<String, Object>` for
JSON objects, pass-through for
+ /// the other Jackson scalar types (`Boolean`, `Integer`, `Long`, `Double`,
`String`).
+ private static Object convert(Object value) {
+ // BigInteger widens (Pinot has no BigInteger type)
+ if (value instanceof BigInteger) {
+ return new BigDecimal((BigInteger) value);
+ }
+ // List
+ if (value instanceof List) {
+ //noinspection unchecked
+ return convertList((List<Object>) value);
+ }
+ // Map
+ if (value instanceof Map) {
+ //noinspection unchecked
+ return convertMap((Map<String, Object>) value);
+ }
+ // Single value pass-through (Boolean / Integer / Long / Double / String)
+ return value;
+ }
+
+ private static Object[] convertList(List<Object> list) {
+ int n = list.size();
+ Object[] result = new Object[n];
+ for (int i = 0; i < n; i++) {
+ Object v = list.get(i);
+ result[i] = v != null ? convert(v) : null;
+ }
+ return result;
+ }
+
+ private static Map<String, Object> convertMap(Map<String, Object> map) {
+ Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ Object v = entry.getValue();
+ result.put(entry.getKey(), v != null ? convert(v) : null);
+ }
+ return result;
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
index 1776466f30c..a67c42d6bfe 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
@@ -61,7 +61,7 @@ public class JSONRecordExtractorTest {
@Test
public void testBigIntegerWidenedToBigDecimal() {
- // Default Jackson parses integer literals that overflow `Long` as
`BigInteger`. The base widens to
+ // Default Jackson parses integer literals that overflow `Long` as
`BigInteger`. The extractor widens to
// `BigDecimal` since Pinot has no `BigInteger` type.
BigInteger value = new BigInteger("99999999999999999999999999");
Object result = extract(value);
diff --git
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
index 5a98cb5414e..5967afc875d 100644
---
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractor.java
@@ -44,9 +44,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/// Extracts a single ORC row into a [GenericRow]. Input is an
[ORCRecordExtractor.Record] handle wrapping
-/// a [VectorizedRowBatch] + schema + row index. Dispatch happens in
[#extractValue] (complex types —
-/// `LIST` / `MAP` / `STRUCT`) and [#extractSingleValue] (primitives); ORC
values never flow through
-/// `convertSingleValue`, so widening / `Temporal` handling is done locally
here.
+/// a [VectorizedRowBatch] + schema + row index.
///
/// **ORC schema category → Java output type:**
/// - `BOOLEAN` → `Boolean`
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
index 309c9bb96df..48e6eddd739 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.utils.TimestampUtils;
/// The type matrix is inherited from [AvroRecordExtractor]; the only override
is the INT96 timestamp
-/// (which parquet-avro surfaces as `fixed(12)` with `doc = "INT96 represented
as byte[12]"`) → [java.sql.Timestamp]
+/// (which parquet-avro surfaces as `fixed(12)` with `doc = "INT96 represented
as byte[12]"`) → `Timestamp`
/// (or `Long` epoch nanos when `extractRawTimeValues` is `true`) via
[ParquetUtils#convertInt96ToEpochNanos].
public class ParquetAvroRecordExtractor extends AvroRecordExtractor {
private static final int INT96_BYTE_SIZE = 12;
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
index 7d557774f0c..3618f01d87c 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java
@@ -39,10 +39,9 @@ import org.apache.pinot.spi.utils.UuidUtils;
/// Extracts Pinot [GenericRow] from Parquet [Group] objects, using Parquet
[LogicalTypeAnnotation]s to drive
-/// LIST and MAP handling per the Parquet LogicalTypes spec
backward-compatibility rules. Output adheres to
-/// the `RecordExtractor` contract.
+/// LIST and MAP handling per the Parquet LogicalTypes spec
backward-compatibility rules.
///
-/// **Primitive types** — preserved as their boxed Java type:
+/// **Primitive types:**
/// - `BOOLEAN` → `Boolean`
/// - `INT32` → `Integer`
/// - `INT64` → `Long`
@@ -51,32 +50,22 @@ import org.apache.pinot.spi.utils.UuidUtils;
/// - `BINARY` with `STRING` / `ENUM` annotation → `String`
/// - `BINARY` / `FIXED_LEN_BYTE_ARRAY` without annotation → `byte[]`
///
-/// **Logical types** — Parquet logical-type annotations decoded into uniform
Java types regardless of the
-/// underlying physical encoding:
-/// - `DECIMAL` (on `INT32` / `INT64` / `BINARY` / `FIXED_LEN_BYTE_ARRAY`) →
`BigDecimal`. Always converted —
-/// raw bytes aren't interpretable without external precision / scale
-/// - `INT64` + `TIMESTAMP_MILLIS` / `MICROS` / `NANOS` → `Timestamp`
(sub-millisecond nanos preserved via
-/// `Timestamp#setNanos`), or `Long` raw value in the column's declared unit
when raw
-/// - `INT96` → `Timestamp` (sub-millisecond nanos preserved), or `Long` epoch
nanos when raw (nanos is
-/// INT96's natural unit since its physical encoding — nanos-of-day + Julian
day — carries nanosecond
-/// precision)
-/// - `INT32` + `DATE` → `LocalDate`, or `Integer` raw days-since-epoch when
raw
-/// - `INT32` + `TIME_MILLIS` → `LocalTime`, or `Integer` raw
ms-since-midnight when raw
-/// - `INT64` + `TIME_MICROS` / `TIME_NANOS` → `LocalTime` (full nanosecond
precision), or `Long` raw
-/// value-since-midnight in the column's declared unit when raw
-/// - `FIXED_LEN_BYTE_ARRAY(16)` + `UUID` → `java.util.UUID`. Always
converted; the downstream type
-/// transformer adapts to the Pinot column's storage type — `STRING` →
canonical form, `BYTES` →
-/// 16-byte big-endian form
+/// **Logical types:**
+/// - `DECIMAL` (on `INT32` / `INT64` / `BINARY` / `FIXED_LEN_BYTE_ARRAY`) →
`BigDecimal`
+/// - `INT64` + `TIMESTAMP_MILLIS` / `MICROS` / `NANOS` → `Timestamp`, or
`Long` in the column's declared unit
+/// when `extractRawTimeValues` is `true`
+/// - `INT96` → `Timestamp`, or `Long` epoch nanos when `extractRawTimeValues`
is `true`
+/// - `INT32` + `DATE` → `LocalDate`, or `Integer` days-since-epoch when
`extractRawTimeValues` is `true`
+/// - `INT32` + `TIME_MILLIS` → `LocalTime`, or `Integer` ms-since-midnight
when `extractRawTimeValues` is `true`
+/// - `INT64` + `TIME_MICROS` / `TIME_NANOS` → `LocalTime`, or `Long`
value-since-midnight in the column's
+/// declared unit when `extractRawTimeValues` is `true`
+/// - `FIXED_LEN_BYTE_ARRAY(16)` + `UUID` → `java.util.UUID`
///
-/// [ParquetNativeRecordExtractorConfig#isExtractRawTimeValues] opts out of
TIMESTAMP / DATE / TIME conversion.
-/// DECIMAL and UUID always convert.
-///
-/// **Multi-value:** `LIST`-annotated group (standard 3-level wrapper or
legacy non-wrapper forms) → `Object[]`.
-///
-/// **Map / nested complex:** `MAP`-annotated group → `Map<Object, Object>`;
plain non-annotated group →
-/// `Map<String, Object>` (struct).
-///
-/// **Null:** field with zero repetition count → `null`.
+/// **Complex types:**
+/// - `LIST`-annotated group (standard 3-level wrapper or legacy non-wrapper
forms) → `Object[]`
+/// - `MAP`-annotated group → `Map<String, Object>` (keys stringified via
[BaseRecordExtractor#stringifyMapKey])
+/// - plain non-annotated group → `Map<String, Object>` (struct)
+/// - field with zero repetition count → `null`
public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> {
private boolean _extractRawTimeValues;
@@ -90,10 +79,6 @@ public class ParquetNativeRecordExtractor extends
BaseRecordExtractor<Group> {
@Override
public GenericRow extract(Group from, GenericRow to) {
- // Parquet values bypass `convert()` — `extractValue` produces
contract-typed values directly (Number,
- // Boolean, byte[], String, BigDecimal, LocalDate, LocalTime, Timestamp,
plus Object[] / Map for nested
- // complex types via recursive extractValue). The base's universal
normalizations (Byte/Short widening,
- // ByteBuffer slicing, BigInteger → BigDecimal) don't apply to anything
Parquet produces.
GroupType fromType = from.getType();
if (_extractAll) {
List<Type> fields = fromType.getFields();
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
index 57fed3b4894..2a4881ddca1 100644
---
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
@@ -31,22 +31,20 @@ import
org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
-/// Extracts Pinot [GenericRow] from a ProtoBuf [Message]. Conversion is
dispatched directly off the field's
-/// [Descriptors.FieldDescriptor] (`isMapField` / `isRepeated` /
`getJavaType`) — no [BaseRecordExtractor#convert]
-/// pipeline, no per-value wrapper allocation.
+/// Extracts Pinot [GenericRow] from a ProtoBuf [Message].
///
/// **Proto source type → Java input → Java output type:**
-/// - proto `bool` → `Boolean` → `Boolean`
-/// - proto `int32` / `sint32` / `sfixed32` / `uint32` / `fixed32` → `Integer`
→ `Integer`
-/// - proto `int64` / `sint64` / `sfixed64` / `uint64` / `fixed64` → `Long` →
`Long`
-/// - proto `float` → `Float` → `Float`
-/// - proto `double` → `Double` → `Double`
-/// - proto `string` → `String` → `String`
-/// - proto `bytes` → [ByteString] → `byte[]` (via [ByteString#toByteArray])
-/// - proto `enum` → [Descriptors.EnumValueDescriptor] → enum constant name
`String`
-/// - proto nested `message` → [Message] → `Map<String, Object>` (recursive
over the message's set fields)
-/// - proto `repeated X` → `List<X>` → `Object[]` (each element recursively
converted)
-/// - proto `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys
stringified via
+/// - `bool` → `Boolean` → `Boolean`
+/// - `int32` / `sint32` / `sfixed32` / `uint32` / `fixed32` → `Integer` →
`Integer`
+/// - `int64` / `sint64` / `sfixed64` / `uint64` / `fixed64` → `Long` → `Long`
+/// - `float` → `Float` → `Float`
+/// - `double` → `Double` → `Double`
+/// - `string` → `String` → `String`
+/// - `bytes` → [ByteString] → `byte[]` (via [ByteString#toByteArray])
+/// - `enum` → [Descriptors.EnumValueDescriptor] → enum constant name `String`
+/// - `message` → [Message] → `Map<String, Object>` (recursive over the
message's set fields)
+/// - `repeated X` → `List<X>` → `Object[]` (each element recursively
converted)
+/// - `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys stringified via
/// [BaseRecordExtractor#stringifyMapKey], values recursively converted)
/// - proto3 `optional` field that is unset / cleared → `null`
public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> {
diff --git
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
index 6716d4c82dc..99da5094414 100644
---
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractor.java
@@ -18,8 +18,11 @@
*/
package org.apache.pinot.plugin.inputformat.thrift;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
@@ -33,19 +36,19 @@ import org.apache.thrift.meta_data.FieldMetaData;
/// Extracts Pinot [GenericRow] from a Thrift-generated [TBase] via
`getFieldValue(fieldForId(...))`.
///
/// **Thrift source type → Java input → Java output type:**
-/// - thrift `bool` → `Boolean` → `Boolean`
-/// - thrift `i8` → `Byte` → `Integer` (widened by base)
-/// - thrift `i16` → `Short` → `Integer` (widened by base)
-/// - thrift `i32` → `Integer` → `Integer`
-/// - thrift `i64` → `Long` → `Long`
-/// - thrift `double` → `Double` → `Double`
-/// - thrift `string` → `String` → `String`
-/// - thrift `binary` → `ByteBuffer` → `byte[]`
-/// - thrift `enum` → `TEnum` → enum name `String` (via `toString()`)
-/// - thrift nested `struct` → [TBase] → `Map<String, Object>`
-/// - thrift `list<X>` / `set<X>` → `List<X>` / `Set<X>` → `Object[]`
-/// - thrift `map<K, V>` → `Map<K, V>` → `Map<Object, Object>`
-/// - thrift unset optional field → `null`
+/// - `bool` → `Boolean` → `Boolean`
+/// - `i8` → `Byte` → `Integer` (widened)
+/// - `i16` → `Short` → `Integer` (widened)
+/// - `i32` → `Integer` → `Integer`
+/// - `i64` → `Long` → `Long`
+/// - `double` → `Double` → `Double`
+/// - `string` → `String` → `String`
+/// - `binary` → `ByteBuffer` → `byte[]`
+/// - `enum` → `TEnum` → enum name `String` (via `toString()`)
+/// - `struct` → [TBase] → `Map<String, Object>`
+/// - `list<X>` / `set<X>` → `List<X>` / `Set<X>` → `Object[]`
+/// - `map<K, V>` → `Map<K, V>` → `Map<String, Object>` (keys stringified via
[BaseRecordExtractor#stringifyMapKey])
+/// - unset optional field → `null`
@SuppressWarnings({"rawtypes", "unchecked"})
public class ThriftRecordExtractor extends BaseRecordExtractor<TBase> {
@@ -63,50 +66,98 @@ public class ThriftRecordExtractor extends
BaseRecordExtractor<TBase> {
if (_extractAll) {
for (Map.Entry<String, Integer> nameToId : _fieldIds.entrySet()) {
Object value =
from.getFieldValue(from.fieldForId(nameToId.getValue()));
- if (value != null) {
- value = convert(value);
- }
- to.putValue(nameToId.getKey(), value);
+ to.putValue(nameToId.getKey(), value != null ? convert(value) : null);
}
} else {
for (String fieldName : _fields) {
Integer fieldId = _fieldIds.get(fieldName);
if (fieldId != null) {
Object value = from.getFieldValue(from.fieldForId(fieldId));
- if (value != null) {
- value = convert(value);
- }
- to.putValue(fieldName, value);
+ to.putValue(fieldName, value != null ? convert(value) : null);
}
}
}
return to;
}
- /**
- * Returns whether the object is a Thrift object.
- */
- @Override
- protected boolean isRecord(Object value) {
- return value instanceof TBase;
+ /// Dispatches a non-null Thrift value off its runtime Java type: `Object[]`
for `Collection` (list / set),
+ /// `Map<String, Object>` for `Map` and for nested `TBase` records,
single-value normalization for scalars.
+ private static Object convert(Object value) {
+ // List
+ if (value instanceof Collection) {
+ return convertCollection((Collection<Object>) value);
+ }
+ // Map
+ if (value instanceof Map) {
+ return convertMap((Map<Object, Object>) value);
+ }
+ // Record
+ if (value instanceof TBase) {
+ return convertRecord((TBase) value);
+ }
+ // Single value
+ return convertSingleValue(value);
}
- /**
- * Handles the conversion of each field of a Thrift object.
- *
- * @param value should be verified to be a Thrift TBase type prior to
calling this method as it will be casted
- * without checking
- */
- @Override
- protected Map<String, Object> convertRecord(Object value) {
- TBase record = (TBase) value;
+ private static Object[] convertCollection(Collection<Object> collection) {
+ Object[] result = new Object[collection.size()];
+ int i = 0;
+ for (Object value : collection) {
+ result[i++] = value != null ? convert(value) : null;
+ }
+ return result;
+ }
+
+ /// Converts a `map<K, V>`. Keys flow through `convertSingleValue` (Thrift's
allowed key types — bool /
+ /// i8 / i16 / i32 / i64 / double / string / binary — cover the same
matrix), then are stringified via
+ /// [BaseRecordExtractor#stringifyMapKey] per the `Map<String, Object>`
contract.
+ private static Map<String, Object> convertMap(Map<Object, Object> map) {
+ Map<String, Object> result = Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ Object key = entry.getKey();
+ if (key == null) {
+ continue;
+ }
+ Object convertedKey = convertSingleValue(key);
+ if (convertedKey == null) {
+ continue;
+ }
+ Object value = entry.getValue();
+ result.put(stringifyMapKey(convertedKey), value != null ? convert(value)
: null);
+ }
+ return result;
+ }
+
+ private static Map<String, Object> convertRecord(TBase record) {
Set<TFieldIdEnum> fields =
FieldMetaData.getStructMetaDataMap(record.getClass()).keySet();
- Map<String, Object> convertedRecord =
Maps.newHashMapWithExpectedSize(fields.size());
+ Map<String, Object> result =
Maps.newHashMapWithExpectedSize(fields.size());
for (TFieldIdEnum field : fields) {
- Object fieldValue = record.getFieldValue(field);
- Object convertedValue = fieldValue != null ? convert(fieldValue) : null;
- convertedRecord.put(field.getFieldName(), convertedValue);
+ Object value = record.getFieldValue(field);
+ result.put(field.getFieldName(), value != null ? convert(value) : null);
+ }
+ return result;
+ }
+
+ /// Single-value normalization for Thrift's scalar Java types: `Byte` /
`Short` widen to `Integer`,
+ /// `ByteBuffer` materializes to `byte[]` (slice-safely so the source
buffer's position is not advanced),
+ /// other `Number` / `Boolean` / `byte[]` pass through, anything else (e.g.
`TEnum`) → `toString()`.
+ @VisibleForTesting
+ static Object convertSingleValue(Object value) {
+ if (value instanceof Number) {
+ if (value instanceof Byte || value instanceof Short) {
+ return ((Number) value).intValue();
+ }
+ return value;
+ }
+ if (value instanceof Boolean || value instanceof byte[]) {
+ return value;
+ }
+ if (value instanceof ByteBuffer) {
+ ByteBuffer slice = ((ByteBuffer) value).slice();
+ byte[] bytes = new byte[slice.limit()];
+ slice.get(bytes);
+ return bytes;
}
- return convertedRecord;
+ return value.toString();
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
index 8a3ab442c75..81452e13925 100644
---
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordExtractorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.inputformat.thrift;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,8 +34,10 @@ import static org.testng.Assert.assertNull;
/// Tests [ThriftRecordExtractor] — see its class Javadoc for the thrift
source type → Java output type
-/// matrix. Out of scope: thrift `i8` / `binary` are not declared in
`complex_types.thrift`; they follow the
-/// same `convert` contract — mirror the pattern below if added.
+/// matrix. Out of scope: thrift `i8` is not declared in
`complex_types.thrift`; it follows the same
+/// `convert` contract — mirror the pattern below if added. Thrift `binary` is
exercised against the
+/// extractor's static helper [ThriftRecordExtractor#convertSingleValue] (the
test schemas have no `binary`
+/// field, so end-to-end coverage isn't possible).
public class ThriftRecordExtractorTest {
// === Single value — order follows the type list in the class Javadoc ===
@@ -95,7 +98,7 @@ public class ThriftRecordExtractorTest {
@Test
public void testEnumExtractedAsString() {
- // BaseRecordExtractor.convertSingleValue falls back to toString() for
unknown types; TestEnum.toString()
+ // The extractor's single-value path falls back to toString() for unknown
types; TestEnum.toString()
// returns the enum constant name.
ComplexTypes record = baseRecord();
record.setEnumField(TestEnum.GAMMA);
@@ -103,6 +106,24 @@ public class ThriftRecordExtractorTest {
assertEquals(result, "GAMMA");
}
+ @Test
+ public void testBinaryByteBufferExtractedAsByteArray() {
+ // Thrift surfaces `binary` as `ByteBuffer`; the extractor materializes to
`byte[]`.
+ Object result =
ThriftRecordExtractor.convertSingleValue(ByteBuffer.wrap(new byte[]{1, 2, 3}));
+ assertEquals((byte[]) result, new byte[]{1, 2, 3});
+ }
+
+ @Test
+ public void testBinaryByteBufferSliceDoesNotMutateOriginal() {
+ // The extractor must read only the remaining bytes and leave the source
buffer's position untouched
+ // (the underlying buffer is owned by the reader and may be reused).
+ ByteBuffer buf = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4});
+ buf.position(2);
+ Object result = ThriftRecordExtractor.convertSingleValue(buf);
+ assertEquals((byte[]) result, new byte[]{2, 3, 4});
+ assertEquals(buf.position(), 2);
+ }
+
@Test
public void testNullForUnsetOptionalField() {
// optionalStringField is left unset; thrift returns null for unset
optional fields.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
index 6ee53815d32..f59f51ac20c 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
@@ -18,46 +18,16 @@
*/
package org.apache.pinot.spi.data.readers;
-import com.google.common.collect.Maps;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.Base64;
-import java.util.Collection;
-import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ArrayUtils;
-/// Default [RecordExtractor] implementation. Subclasses override only the
bits the format needs.
-///
-/// [#convert] dispatches by shape:
-/// - multi-value (`Collection` / non-`byte[]` array) → [#convertMultiValue] →
`Object[]` (each element recursed)
-/// - map → [#convertMap] → `Map<String, Object>` (keys converted via
[#convertSingleValue] then stringified via
-/// [#stringifyMapKey], each value recursed)
-/// - nested record → [#convertRecord] (throws by default; override for
formats with nested records)
-/// - everything else → [#convertSingleValue]
-///
-/// [#convertSingleValue] applies universal normalizations for primitive types
that any format extractor might
-/// produce:
-/// - `Byte` / `Short` widen to `Integer` so all small ints unify behind a
single Pinot type
-/// - `BigInteger` widens to `BigDecimal` (Pinot has no `BigInteger` data
type; downstream transforms handle
-/// `BigDecimal` natively)
-/// - other `Number` (`Integer` / `Long` / `Float` / `Double` / `BigDecimal`)
passes through
-/// - `Boolean` passes through
-/// - `byte[]` passes through
-/// - `ByteBuffer` materializes to `byte[]` (sliced so the source buffer's
position is not advanced)
-/// - everything else falls back to `value.toString()`
-///
-/// **Logical types (DECIMAL / TIMESTAMP / DATE / TIME / UUID) are NOT handled
here** — see [RecordExtractor]
-/// for the contract. Format-specific extractors do the native-to-contract
conversion themselves (e.g. the
-/// Avro extractor walks the schema in its own `extract` and never reaches
this dispatcher).
+/// Default [RecordExtractor] base providing include-list resolution via
[#init] and the [#stringifyMapKey] helper.
///
/// @param <T> the format of the input record
-@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
/// Include-list resolved from [#init]'s `fields` argument: empty when
[#_extractAll] is `true`, otherwise the
@@ -92,120 +62,6 @@ public abstract class BaseRecordExtractor<T> implements
RecordExtractor<T> {
protected void initConfig(@Nullable RecordExtractorConfig config) {
}
- /// Walks `value` through the dispatcher described on [BaseRecordExtractor]
and returns the
- /// contract-shaped result (single value, `Object[]`, or `Map<Object,
Object>`). Subclasses call this
- /// from their `extract` loop when the input is already a contract-friendly
Java type tree
- /// (`Collection` / `Map` / nested record). Schema-driven subclasses (Avro,
Parquet native) bypass this
- /// path entirely and produce contract values inline.
- ///
- /// The base impl never returns `null`; format-specific overrides may
translate format-native null
- /// sentinels by returning `null`. Java `null` inputs are short-circuited by
callers and never reach
- /// this method.
- @Nullable
- protected Object convert(Object value) {
- Object convertedValue;
- if (isMultiValue(value)) {
- convertedValue = convertMultiValue(value);
- } else if (isMap(value)) {
- convertedValue = convertMap(value);
- } else if (isRecord(value)) {
- convertedValue = convertRecord(value);
- } else {
- convertedValue = convertSingleValue(value);
- }
- return convertedValue;
- }
-
- /// Whether `value` is multi-value. Default: `Collection` or non-`byte[]`
array.
- protected boolean isMultiValue(Object value) {
- return value instanceof Collection || (value.getClass().isArray() &&
!(value instanceof byte[]));
- }
-
- /// Whether `value` is a map. Default: `instanceof Map`.
- protected boolean isMap(Object value) {
- return value instanceof Map;
- }
-
- /// Whether `value` is a nested record. Default `false`; override for
formats with nested record types.
- protected boolean isRecord(Object value) {
- return false;
- }
-
- /// Converts a multi-value ([Collection] / `Object[]` / primitive array)
into `Object[]`, recursing on each
- /// element via [#convert]. The base impl never returns `null`; the return
is marked nullable so format-specific
- /// overrides retain the option to translate format-native null sentinels
(e.g. an empty list interpreted as
- /// `null`).
- @Nullable
- protected Object[] convertMultiValue(Object value) {
- if (value instanceof Collection) {
- return convertCollection((Collection) value);
- }
- if (value instanceof Object[]) {
- return convertArray((Object[]) value);
- }
- return convertPrimitiveArray(value);
- }
-
- protected Object[] convertCollection(Collection collection) {
- Object[] convertedValues = new Object[collection.size()];
- int index = 0;
- for (Object value : collection) {
- convertedValues[index++] = value != null ? convert(value) : null;
- }
- return convertedValues;
- }
-
- protected Object[] convertArray(Object[] array) {
- int numValues = array.length;
- Object[] convertedValues = new Object[numValues];
- for (int i = 0; i < numValues; i++) {
- Object value = array[i];
- convertedValues[i] = value != null ? convert(value) : null;
- }
- return convertedValues;
- }
-
- protected Object[] convertPrimitiveArray(Object array) {
- if (array instanceof int[]) {
- return ArrayUtils.toObject((int[]) array);
- }
- if (array instanceof long[]) {
- return ArrayUtils.toObject((long[]) array);
- }
- if (array instanceof float[]) {
- return ArrayUtils.toObject((float[]) array);
- }
- if (array instanceof double[]) {
- return ArrayUtils.toObject((double[]) array);
- }
- throw new IllegalArgumentException("Unsupported primitive array type: " +
array.getClass().getName());
- }
-
- /// Converts a map, recursing on each value via [#convert]. Keys go through
[#convertSingleValue] (so format-native
- /// raw types like `ByteBuffer` / `Byte` / `BigInteger` reach the contract
form expected by [#stringifyMapKey])
- /// and are then stringified per the `Map<String, Object>` contract. Entries
with a `null` key — either input or
- /// post-conversion (a format-specific override may translate the key to
`null` for a format-native null sentinel)
- /// — are dropped. The base impl never returns `null`; the return is marked
nullable so format-specific overrides
- /// retain the option to translate format-native null sentinels.
- @Nullable
- protected Map<String, Object> convertMap(Object value) {
- Map<Object, Object> map = (Map) value;
- Map<String, Object> convertedMap =
Maps.newHashMapWithExpectedSize(map.size());
- for (Map.Entry<Object, Object> entry : map.entrySet()) {
- Object mapKey = entry.getKey();
- if (mapKey == null) {
- continue;
- }
- Object convertedKey = convertSingleValue(mapKey);
- if (convertedKey == null) {
- continue;
- }
- Object mapValue = entry.getValue();
- convertedMap.put(stringifyMapKey(convertedKey), mapValue != null ?
convert(mapValue) : null);
- }
- return convertedMap;
- }
-
/// Stringifies a map key per the `RecordExtractor` `Map<String, Object>`
contract. Single source of truth
/// for map-key stringification across every format extractor:
/// - `byte[]` → base64 (matches Jackson's `byte[]` value serialization, so
a serialized map reads
@@ -226,41 +82,4 @@ public abstract class BaseRecordExtractor<T> implements
RecordExtractor<T> {
}
return key.toString();
}
-
- /// Converts a nested record into a `Map<String, Object>`. Default throws —
override in formats that have nested
- /// records (Avro `GenericRecord`, Thrift `TBase`, ProtoBuf nested
`Message`). Marked nullable so format-specific
- /// overrides retain the option to translate format-native null sentinels.
- @Nullable
- protected Map<String, Object> convertRecord(Object value) {
- throw new UnsupportedOperationException(
- getClass().getSimpleName() + " does not support nested records;
override convertRecord() to enable.");
- }
-
- /// Applies the universal primitive normalizations documented on
[BaseRecordExtractor]. Format-specific extractors
- /// override this to add native-type handling (e.g. Avro `Instant` →
`Timestamp`, ProtoBuf `EnumValueDescriptor`
- /// → `String`). The base impl never returns `null`; the return is marked
nullable so format-specific overrides
- /// retain the option to translate format-native null sentinels.
- @Nullable
- protected Object convertSingleValue(Object value) {
- if (value instanceof Number) {
- if (value instanceof Byte || value instanceof Short) {
- return ((Number) value).intValue();
- }
- if (value instanceof BigInteger) {
- return new BigDecimal((BigInteger) value);
- }
- return value;
- }
- if (value instanceof Boolean || value instanceof byte[]) {
- return value;
- }
- if (value instanceof ByteBuffer) {
- // ByteBuffer might be reused by the reader. Slice to avoid advancing
the original buffer's position.
- ByteBuffer slice = ((ByteBuffer) value).slice();
- byte[] bytesValue = new byte[slice.limit()];
- slice.get(bytesValue);
- return bytesValue;
- }
- return value.toString();
- }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
index 5d0076b232c..4396117c4e7 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/BaseRecordExtractorTest.java
@@ -19,362 +19,77 @@
package org.apache.pinot.spi.data.readers;
import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Set;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-/// Tests what [BaseRecordExtractor] still owns after format-specific
extractors took over their own type
-/// conversions:
-/// - **Universal normalizations**: `Byte` / `Short` → `Integer`, [BigInteger]
→ [BigDecimal], `ByteBuffer` →
-/// `byte[]` (with slice-safety guarantee); passthrough for other `Number` /
`Boolean` / `byte[]` / `String`.
-/// - **Recursive walker** for nested complex inputs that format extractors
with `Collection` / `Map` / nested
-/// record shapes (Avro, Thrift, JSON) lean on:
-/// - `Collection` / `Object[]` / primitive `int[]` / `long[]` / `float[]` /
`double[]` → `Object[]`;
-/// unsupported primitive arrays (`boolean[]` / `short[]` / `char[]`)
throw.
-/// - `Map`: keys flow through `convertSingleValue`; values recursively
converted.
-/// - Nested record: default `convertRecord` throws unless the subclass
overrides it.
-/// - **Fallback**: any other type → `value.toString()`.
-///
-/// Date / time types are NOT exercised here — DATE / TIME passthrough and
TIMESTAMP narrowing are per-format
-/// (see e.g. `AvroRecordExtractorTest`).
public class BaseRecordExtractorTest {
- // === Single-value — order follows the type list in the class Javadoc ===
+ // === init / include-list resolution ===
@Test
- void testBooleanPreserved() {
- Object trueResult = extract(true);
- assertEquals(trueResult, true);
- Object falseResult = extract(false);
- assertEquals(falseResult, false);
+ public void testInitNullFieldsExtractsAll() {
+ NoOpExtractor extractor = new NoOpExtractor();
+ extractor.init(null, null);
+ assertTrue(extractor._extractAll);
+ assertEquals(extractor._fields, Set.of());
}
@Test
- void testByteWidenedToInteger() {
- Object result = extract((byte) 42);
- assertEquals(result, 42);
+ public void testInitEmptyFieldsExtractsAll() {
+ NoOpExtractor extractor = new NoOpExtractor();
+ extractor.init(Set.of(), null);
+ assertTrue(extractor._extractAll);
+ assertEquals(extractor._fields, Set.of());
}
@Test
- void testShortWidenedToInteger() {
- Object result = extract((short) 42);
- assertEquals(result, 42);
- }
-
- @Test
- void testIntegerPreserved() {
- Object result = extract(42);
- assertEquals(result, 42);
- }
-
- @Test
- void testLongPreserved() {
- Object result = extract(42L);
- assertEquals(result, 42L);
- }
-
- @Test
- void testFloatPreserved() {
- Object result = extract(1.5f);
- assertEquals(result, 1.5f);
- }
-
- @Test
- void testDoublePreserved() {
- Object result = extract(1.5d);
- assertEquals(result, 1.5d);
- }
-
- @Test
- void testBigDecimalPreserved() {
- Object result = extract(new BigDecimal("123.45"));
- assertEquals(result, new BigDecimal("123.45"));
- }
-
- @Test
- void testBigIntegerWidenedToBigDecimal() {
- // Pinot has no `BigInteger` type; the base widens to `BigDecimal` so
downstream transforms can handle it.
- Object result = extract(new BigInteger("12345678901234567890123456789"));
- assertEquals(result, new BigDecimal("12345678901234567890123456789"));
- }
-
- @Test
- void testStringPreserved() {
- Object result = extract("hello");
- assertEquals(result, "hello");
- }
-
- // === byte[] / ByteBuffer ===
-
- @Test
- void testByteArrayPreserved() {
- Object result = extract(new byte[]{1, 2, 3});
- assertEquals((byte[]) result, new byte[]{1, 2, 3});
- }
-
- @Test
- void testByteBufferConvertedToByteArray() {
- Object result = extract(ByteBuffer.wrap(new byte[]{1, 2, 3}));
- assertEquals((byte[]) result, new byte[]{1, 2, 3});
- }
-
- @Test
- void testByteBufferSliceDoesNotMutateOriginal() {
- // The extractor must read only the remaining bytes and leave the source
buffer's position untouched.
- ByteBuffer buf = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4});
- buf.position(2);
- Object result = extract(buf);
- assertEquals((byte[]) result, new byte[]{2, 3, 4});
- assertEquals(buf.position(), 2);
- }
-
-
- // === Fallback / null ===
-
- @Test
- void testNonStandardObjectFallsBackToToString() {
- Object result = extract(new StringBuilder("hello"));
- assertEquals(result, "hello");
- }
-
- // === Multi-value (Collection / Object[] / primitive array) → Object[] ===
-
- @Test
- void testListExtractedAsArray() {
- Object[] result = (Object[]) extract(List.of(1, 2, 3));
- assertEquals(result, new Object[]{1, 2, 3});
- }
-
- @Test
- void testObjectArrayExtractedAsArray() {
- Object[] result = (Object[]) extract(new Object[]{1, "a", true});
- assertEquals(result, new Object[]{1, "a", true});
- }
-
- @Test
- void testIntPrimitiveArrayExtractedAsArray() {
- Object[] result = (Object[]) extract(new int[]{10, 20, 30});
- assertEquals(result, new Object[]{10, 20, 30});
- }
-
- @Test
- void testLongPrimitiveArrayExtractedAsArray() {
- Object[] result = (Object[]) extract(new long[]{10L, 20L, 30L});
- assertEquals(result, new Object[]{10L, 20L, 30L});
- }
-
- @Test
- void testFloatPrimitiveArrayExtractedAsArray() {
- Object[] result = (Object[]) extract(new float[]{1.5f, 2.5f});
- assertEquals(result, new Object[]{1.5f, 2.5f});
- }
-
- @Test
- void testDoublePrimitiveArrayExtractedAsArray() {
- Object[] result = (Object[]) extract(new double[]{1.5d, 2.5d});
- assertEquals(result, new Object[]{1.5d, 2.5d});
- }
-
- @Test
- void testUnsupportedPrimitiveArrayThrows() {
- expectThrows(IllegalArgumentException.class, () -> extract(new char[]{'a',
'b'}));
- }
-
- @Test
- void testListPreservesNullElements() {
- // Shape preservation: null elements stay null in `Object[]`.
- Object[] result = (Object[]) extract(Arrays.asList(1, null, 3));
- assertEquals(result, new Object[]{1, null, 3});
- }
-
- @Test
- void testObjectArrayPreservesNullElements() {
- Object[] result = (Object[]) extract(new Object[]{1, null, 3});
- assertEquals(result, new Object[]{1, null, 3});
- }
-
- @Test
- void testEmptyListExtractedAsEmptyArray() {
- Object[] result = (Object[]) extract(List.of());
- assertEquals(result, new Object[]{});
- }
-
- @Test
- void testEmptyObjectArrayExtractedAsEmptyArray() {
- Object[] result = (Object[]) extract(new Object[]{});
- assertEquals(result, new Object[]{});
- }
-
- // === Map (recursively converted) ===
-
- @Test
- void testMapWithStringKeys() {
- Map<?, ?> result = (Map<?, ?>) extract(Map.of(
- "k1", 1,
- "k2", "foo"
- ));
- assertEquals(result.size(), 2);
- assertEquals(result.get("k1"), 1);
- assertEquals(result.get("k2"), "foo");
- }
-
- @Test
- void testMapValuesRecursivelyConverted() {
- // Inner List values become Object[]; inner Short values widen to Integer.
- Map<?, ?> result = (Map<?, ?>) extract(Map.of(
- "list", List.of(1, 2),
- "short", (short) 7
- ));
- assertEquals((Object[]) result.get("list"), new Object[]{1, 2});
- assertEquals(result.get("short"), 7);
- }
-
- @Test
- void testMapKeysRunThroughConvertSingleValue() {
- // Keys flow through `convertSingleValue` before `stringifyMapKey`.
ByteBuffer materializes to byte[] then
- // base64-encodes; Byte/Short/BigInteger widen before `toString()`.
- HashMap<Object, Object> input = new HashMap<>();
- input.put(ByteBuffer.wrap(new byte[]{1, 2, 3}), "bytes");
- input.put((byte) 7, "byte");
- input.put((short) 8, "short");
- input.put(new BigInteger("12345678901234567890"), "bigint");
- Map<?, ?> result = (Map<?, ?>) extract(input);
- assertEquals(result, Map.of(
- "AQID", "bytes",
- "7", "byte",
- "8", "short",
- "12345678901234567890", "bigint"
- ));
- }
-
- @Test
- void testMapPreservesNullValues() {
- // Shape preservation: null values stay null (only null *keys* drop the
entry).
- HashMap<String, Object> input = new HashMap<>();
- input.put("k1", 1);
- input.put("k2", null);
- Map<?, ?> result = (Map<?, ?>) extract(input);
- assertEquals(result.size(), 2);
- assertEquals(result.get("k1"), 1);
- assertNull(result.get("k2"));
- assertTrue(result.containsKey("k2"));
- }
-
- @Test
- void testMapDropsNullKeyEntry() {
- HashMap<Object, Object> input = new HashMap<>();
- input.put("k1", 1);
- input.put(null, "ignored");
- Map<?, ?> result = (Map<?, ?>) extract(input);
- assertEquals(result, Map.of("k1", 1));
- }
-
- @Test
- void testMapDropsEntryWhenConvertSingleValueReturnsNull() {
- // Format-specific overrides may translate a non-null Java key to `null`
(a format-native null sentinel).
- // The entry must be dropped — converted-key check, not just input-key
check.
- Map<?, ?> result = (Map<?, ?>) new SentinelKeyExtractor().convert(Map.of(
- "k1", 1,
- "__SENTINEL__", 2
- ));
- assertEquals(result, Map.of("k1", 1));
- }
-
- @Test
- void testEmptyMapExtractedAsEmptyMap() {
- Map<?, ?> result = (Map<?, ?>) extract(Map.of());
- assertEquals(result.size(), 0);
+ public void testInitNonEmptyFieldsRestrictsToIncludeList() {
+ NoOpExtractor extractor = new NoOpExtractor();
+ extractor.init(Set.of("a", "b"), null);
+ assertFalse(extractor._extractAll);
+ assertEquals(extractor._fields, Set.of("a", "b"));
}
// === stringifyMapKey — shared helper for the Map<String, Object> contract
===
@Test
- void testStringifyMapKeyByteArrayBase64Encoded() {
+ public void testStringifyMapKeyByteArrayBase64Encoded() {
assertEquals(BaseRecordExtractor.stringifyMapKey(new byte[]{0, 1, 2, 3}),
"AAECAw==");
}
@Test
- void testStringifyMapKeyTimestampUsesIsoUtcWithNanos() {
- java.sql.Timestamp ts = new java.sql.Timestamp(1700000000123L);
+ public void testStringifyMapKeyTimestampUsesIsoUtcWithNanos() {
+ Timestamp ts = new Timestamp(1700000000123L);
ts.setNanos(123_456_789);
// ISO-8601 UTC, JVM-TZ-stable, full nanosecond precision preserved.
assertEquals(BaseRecordExtractor.stringifyMapKey(ts),
"2023-11-14T22:13:20.123456789Z");
}
@Test
- void testStringifyMapKeyOtherTypesUseToString() {
+ public void testStringifyMapKeyOtherTypesUseToString() {
assertEquals(BaseRecordExtractor.stringifyMapKey("k"), "k");
assertEquals(BaseRecordExtractor.stringifyMapKey(42), "42");
assertEquals(BaseRecordExtractor.stringifyMapKey(123L), "123");
assertEquals(BaseRecordExtractor.stringifyMapKey(true), "true");
assertEquals(BaseRecordExtractor.stringifyMapKey(new BigDecimal("1.50")),
"1.50");
-
assertEquals(BaseRecordExtractor.stringifyMapKey(java.time.LocalDate.of(2024,
1, 1)), "2024-01-01");
- assertEquals(BaseRecordExtractor.stringifyMapKey(java.time.LocalTime.of(8,
51, 32)), "08:51:32");
+ assertEquals(BaseRecordExtractor.stringifyMapKey(LocalDate.of(2024, 1,
1)), "2024-01-01");
+ assertEquals(BaseRecordExtractor.stringifyMapKey(LocalTime.of(8, 51, 32)),
"08:51:32");
}
- // === Nested record — base throws unless overridden ===
-
- @Test
- void testNestedRecordThrowsByDefault() {
- UnsupportedOperationException ex =
expectThrows(UnsupportedOperationException.class,
- () -> new RecordCapableExtractor().convert(new Object()));
- assertEquals(ex.getMessage(),
- "RecordCapableExtractor does not support nested records; override
convertRecord() to enable.");
- }
+ // === Helper ===
- // === Helpers ===
-
- private static Object extract(Object value) {
- return new TestExtractor().convert(value);
- }
-
- /// Minimal subclass that doesn't override anything — exercises the default
conversion paths in
- /// [BaseRecordExtractor].
- private static final class TestExtractor extends BaseRecordExtractor<Object>
{
+ private static final class NoOpExtractor extends BaseRecordExtractor<Object>
{
@Override
public GenericRow extract(Object from, GenericRow to) {
throw new UnsupportedOperationException();
}
}
-
- /// Subclass that returns `true` from [#isRecord] for any input but does NOT
override `convertRecord` —
- /// triggers the default `UnsupportedOperationException`.
- private static final class RecordCapableExtractor extends
BaseRecordExtractor<Object> {
- @Override
- public GenericRow extract(Object from, GenericRow to) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean isRecord(Object value) {
- return true;
- }
- }
-
- /// Subclass that returns `null` from [#convertSingleValue] for the
`"__SENTINEL__"` key — simulates a format's
- /// null-sentinel translation. Used to verify that `convertMap` drops
entries whose post-conversion key is
- /// `null`, not just entries whose input key is `null`.
- private static final class SentinelKeyExtractor extends
BaseRecordExtractor<Object> {
- @Override
- public GenericRow extract(Object from, GenericRow to) {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- protected Object convertSingleValue(Object value) {
- return "__SENTINEL__".equals(value) ? null :
super.convertSingleValue(value);
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]