Jackie-Jiang commented on code in PR #18434:
URL: https://github.com/apache/pinot/pull/18434#discussion_r3199610362
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java:
##########
@@ -18,52 +18,68 @@
*/
package org.apache.pinot.plugin.inputformat.arrow;
-
import java.io.ByteArrayInputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into
Pinot GenericRow.
- * This decoder handles Arrow streaming format and converts Arrow data to
Pinot's columnar format.
- */
+
+/// Decodes Apache Arrow IPC stream-format messages into Pinot [GenericRow]s.
The output shape depends on the Arrow
+/// batch's row count:
+/// - 0 row → returns `null` (nothing to ingest).
+/// - 1 row → the single row's fields are populated directly into the
destination [GenericRow].
+/// - multiple rows → the rows are wrapped in a `List<GenericRow>` stored
under [GenericRow#MULTIPLE_RECORDS_KEY]
+/// on the destination.
public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB
default
- private static final Logger logger =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+ private ArrowRecordExtractor _extractor;
private String _streamTopicName;
private RootAllocator _allocator;
- private ArrowToGenericRowConverter _converter;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
throws Exception {
+ // Resolve the extractor + config classes from props. Defaults to
`ArrowRecordExtractor` /
+ // `ArrowRecordExtractorConfig`; user-supplied extractors must subclass
[ArrowRecordExtractor]
+ // so the per-batch `setReader` hook is honored.
+ String extractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
+ String configClass = props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
+ if (extractorClass == null) {
+ extractorClass = ArrowRecordExtractor.class.getName();
+ configClass = ArrowRecordExtractorConfig.class.getName();
+ }
+ RecordExtractorConfig extractorConfig = null;
+ if (configClass != null) {
+ extractorConfig = PluginManager.get().createInstance(configClass);
+ extractorConfig.init(props);
+ }
+ _extractor = PluginManager.get().createInstance(extractorClass);
Review Comment:
Fixed — `init` now creates the extractor as an `Object` and validates with
`Preconditions.checkState(extractor instanceof ArrowRecordExtractor, ...)`,
failing with a clear message naming the offending class. The check is needed
for Arrow specifically because the field is typed as the concrete
`ArrowRecordExtractor` (the per-batch `setReader` / `prepareBatch` hooks live
on this base); Avro / JSON use the abstract `RecordExtractor<T>` and don't need
it.
##########
pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordExtractor.java:
##########
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.utils.TimestampUtils;
+
+
+/// Extracts a single Arrow row into a [GenericRow]. Reader-scoped state
([VectorSchemaRoot] +
+/// dictionary map) is bound once via [#setReader]; per-row [#extract] calls
take a [Record] holding
+/// only the row index. Dispatch is schema-driven — each column is walked
using its [Field], so the
+/// logical type drives the conversion rather than the runtime Java type of
the value.
+///
+/// **Scalars** (Arrow type → Java output):
+/// - `Bool` → `Boolean`
+/// - `Int(8/16)` → `Integer` (widened from `Byte` / `Short`)
+/// - `Int(32)` → `Integer`
+/// - `Int(64)` → `Long`
+/// - `FloatingPoint(SINGLE)` → `Float`
+/// - `FloatingPoint(DOUBLE)` → `Double`
+/// - `Decimal` → `BigDecimal`
+/// - `Utf8` / `LargeUtf8` → `String` (via `Text.toString()`)
+/// - `Binary` / `LargeBinary` / `FixedSizeBinary` → `byte[]`
+/// - `Null` → `null` (every row is null by definition)
+///
+/// **Temporal** (per the schema's `DateUnit` / `TimeUnit`):
+/// - `Timestamp` no-TZ → [Timestamp] (Arrow surfaces all four units as
`LocalDateTime`; interpreted
+/// as a UTC instant)
+/// - `Timestamp` with-TZ → [Timestamp] (Arrow surfaces all four units as
`Long` epoch; constructed
+/// per the schema's `TimeUnit`, sub-millisecond precision preserved via
[TimestampUtils])
+/// - `Date` → [LocalDate] (`DateDayVector` surfaces as `Integer` raw days;
`DateMilliVector` as
+/// `LocalDateTime` at UTC midnight — both reduce to a calendar date)
+/// - `Time` → [LocalTime] (`TimeSecVector` as `Integer`, `TimeMilliVector` as
`LocalDateTime`,
+/// `TimeMicroVector` / `TimeNanoVector` as `Long` — all collapse onto
nanoseconds-since-midnight)
+/// - `Interval` / `Duration` → ISO-8601 `String` via `value.toString()` —
`java.time.Period` /
+/// `java.time.Duration` / `PeriodDuration` all have meaningful toString
(e.g. `"P1Y2M"`,
+/// `"PT5H30M"`, `"P1Y2M3D PT4H5M6S"`)
+///
+/// With `extractRawTimeValues = true` ([ArrowRecordExtractorConfig]) the
`Date` / `Time` /
+/// `Timestamp` cases bypass the contract conversion: `Date` → `int`
days-since-epoch (regardless of
+/// `DateUnit` — `DateMilli` is always UTC midnight, so reducing to days is
lossless); `Time` /
+/// `Timestamp` → raw `int` / `long` in the schema's `TimeUnit`. `Interval` /
`Duration` are
+/// unaffected. Temporal values that surface inside a `Union` branch don't see
the bypass either —
+/// the chosen branch's [Field] isn't visible from the value alone, so we
can't pick a unit; they
+/// always coerce to `Timestamp` UTC.
+///
+/// **Complex** (recurse with the [Field]'s child fields):
+/// - `List` / `LargeList` / `FixedSizeList` → `Object[]`
+/// - `Struct` → `Map<String, Object>`
+/// - `Map` → `Map<String, Object>` (Arrow's `List<Map<KEY, VALUE>>` entry
list is flattened;
+/// keys are stringified per [BaseRecordExtractor#stringifyMapKey])
+/// - `Union` → recursively dispatched by the value's runtime Java type (the
chosen branch isn't
+/// visible from the value alone — nested complex sub-branches fall back to
`value.toString()`)
+///
+/// **Other**:
+/// - dictionary-encoded vector → decoded against the bound dictionary, then
dispatched on the
+/// decoded vector's [Field] (so the logical type — e.g. `Utf8` — drives
conversion, not the
+/// dictionary's index type)
+///
+/// Unrecognized types (`NONE` / future Arrow additions) throw
[IllegalStateException].
+///
+/// **Quirks worth knowing:**
+/// - `UInt2Vector` (unsigned 16-bit) returns `Character`, not a `Number` —
Arrow's Java bindings
+/// use `char` as the only natively unsigned primitive. We widen to `int`
per the contract.
+/// - `DateDayVector` / `DateMilliVector` return *different* Java types
(`Integer` vs
+/// `LocalDateTime`) for the same logical `DATE` type — historical asymmetry
in Arrow's API.
+public class ArrowRecordExtractor extends
BaseRecordExtractor<ArrowRecordExtractor.Record> {
+
+ /// One Arrow row's worth of state — just the row index. The reader-scoped
state ([VectorSchemaRoot] + [ArrowReader])
+ /// is bound to the extractor itself via [#setReader].
+ public static final class Record {
+ int _rowId;
+
+ public void set(int rowId) {
+ _rowId = rowId;
+ }
+ }
+
+ private boolean _extractRawTimeValues;
+
+ // Reader-scoped state — initialized in [#setReader], read by per-row
[#extract]. The dictionary map
+ // is held so per-row lookups don't re-traverse the reader; the field
vectors are pre-resolved against
+ // the include list so the per-row loop is a flat array walk (names are read
inline from each field
+ // vector — `Field#getName` is a plain getter).
+ private Map<Long, Dictionary> _dictionaries;
+ private FieldVector[] _fieldVectors;
+
+ @Override
+ protected void initConfig(@Nullable RecordExtractorConfig config) {
+ if (config instanceof ArrowRecordExtractorConfig) {
+ _extractRawTimeValues = ((ArrowRecordExtractorConfig)
config).isExtractRawTimeValues();
+ }
+ }
+
+ /// Binds the extractor to `reader` for the upcoming run of [#extract]
calls. Must be called before
+ /// [#extract] — once per file (`ArrowRecordReader`) or per `decode()` call
(`ArrowMessageDecoder`).
+ /// Resolves the include list against the reader's [VectorSchemaRoot] and
stashes the dictionary map.
+ public void setReader(ArrowReader reader)
+ throws IOException {
+ _dictionaries = reader.getDictionaryVectors();
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ List<FieldVector> fieldVectors = root.getFieldVectors();
+ if (_extractAll) {
+ _fieldVectors = fieldVectors.toArray(new FieldVector[0]);
+ } else {
+ List<FieldVector> matched = new ArrayList<>(_fields.size());
+ for (FieldVector fieldVector : fieldVectors) {
+ if (_fields.contains(fieldVector.getField().getName())) {
+ matched.add(fieldVector);
+ }
+ }
+ _fieldVectors = matched.toArray(new FieldVector[0]);
+ }
+ }
+
+ @Override
+ public GenericRow extract(Record from, GenericRow to) {
+ for (FieldVector fieldVector : _fieldVectors) {
+ to.putValue(fieldVector.getField().getName(), extractValue(fieldVector,
from._rowId));
+ }
+ return to;
+ }
+
+ /// Reads the raw Arrow value at `rowId` (decoding the dictionary-encoded
vector against the bound
+ /// dictionary map first when applicable), then dispatches by the [Field]'s
logical type. The decoded
+ /// vector's [Field] (not the original encoded one) drives the dispatch so
the logical type — e.g.
+ /// `Utf8` — is used instead of the dictionary's index type.
+ @Nullable
+ private Object extractValue(FieldVector fieldVector, int rowId) {
+ DictionaryEncoding dictionaryEncoding =
fieldVector.getField().getDictionary();
+ if (dictionaryEncoding != null) {
+ try (ValueVector decoded = DictionaryEncoder.decode(fieldVector,
_dictionaries.get(dictionaryEncoding.getId()))) {
+ Object rawValue = decoded.getObject(rowId);
+ return rawValue != null ? convert(decoded.getField(), rawValue) : null;
Review Comment:
Fixed — added a `prepareBatch(Record)` lifecycle hook on
`ArrowRecordExtractor`. It decodes each dictionary-encoded column once into
`Record._activeVectors`; per-row `extract` reads the pre-decoded vector.
`Record` is `AutoCloseable` and owns the decoded vectors (closed on next
`prepareBatch` or via `Record.close()`). `ArrowRecordReader` calls
`prepareBatch` after each `loadNextBatch`; `ArrowMessageDecoder` calls it once
per `decode`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]