Copilot commented on code in PR #440:
URL: https://github.com/apache/fluss-rust/pull/440#discussion_r2909045538


##########
crates/fluss/src/row/mod.rs:
##########
@@ -17,10 +17,11 @@
 
 mod column;
 
-mod datum;
+pub(crate) mod datum;
 mod decimal;
 
 pub mod binary;
+pub mod column_writer;

Review Comment:
   `datum` was changed to `pub(crate)` and `column_writer` is exported as 
`pub`. If these modules/types are only intended for the internal Arrow write 
path, consider keeping them crate-private (e.g. `pub(super)`/`pub(crate)` for 
`column_writer` and avoid widening `datum` visibility) to prevent committing to 
a larger public API surface.



##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed column writers that write directly from [`InternalRow`] to concrete
+//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime
+//! `downcast_mut` dispatch.
+
+use crate::error::Error::RowConvertError;
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::InternalRow;
+use crate::row::datum::{
+    MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, 
append_decimal_to_builder,
+    millis_nanos_to_micros, millis_nanos_to_nanos,
+};
+use arrow::array::{
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
+};
+use arrow_schema::DataType as ArrowDataType;
+
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
+/// A typed column writer that reads one column from an [`InternalRow`] and
+/// appends directly to a concrete Arrow builder — no intermediate [`Datum`],
+/// no `as_any_mut().downcast_mut()`.
+pub struct ColumnWriter {
+    pos: usize,
+    nullable: bool,
+    inner: TypedWriter,
+}
+
+enum TypedWriter {
+    Bool(BooleanBuilder),
+    Int8(Int8Builder),
+    Int16(Int16Builder),
+    Int32(Int32Builder),
+    Int64(Int64Builder),
+    Float32(Float32Builder),
+    Float64(Float64Builder),
+    Char {
+        len: usize,
+        builder: StringBuilder,
+    },
+    String(StringBuilder),
+    Bytes(BinaryBuilder),
+    Binary {
+        len: usize,
+        builder: FixedSizeBinaryBuilder,
+    },
+    Decimal128 {
+        src_precision: usize,
+        src_scale: usize,
+        target_precision: u32,
+        target_scale: i64,
+        builder: Decimal128Builder,
+    },
+    Date32(Date32Builder),
+    Time32Second(Time32SecondBuilder),
+    Time32Millisecond(Time32MillisecondBuilder),
+    Time64Microsecond(Time64MicrosecondBuilder),
+    Time64Nanosecond(Time64NanosecondBuilder),
+    TimestampNtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampNtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampNtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampNtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+    TimestampLtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampLtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampLtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampLtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+}
+
+impl ColumnWriter {
+    /// Create a column writer for the given Fluss `DataType` and Arrow
+    /// `ArrowDataType` at position `pos` with the given pre-allocation
+    /// `capacity`.
+    pub fn create(
+        fluss_type: &DataType,
+        arrow_type: &ArrowDataType,
+        pos: usize,
+        capacity: usize,
+    ) -> Result<Self> {
+        let nullable = fluss_type.is_nullable();
+
+        let inner = match fluss_type {
+            DataType::Boolean(_) => 
TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)),
+            DataType::TinyInt(_) => 
TypedWriter::Int8(Int8Builder::with_capacity(capacity)),
+            DataType::SmallInt(_) => 
TypedWriter::Int16(Int16Builder::with_capacity(capacity)),
+            DataType::Int(_) => 
TypedWriter::Int32(Int32Builder::with_capacity(capacity)),
+            DataType::BigInt(_) => 
TypedWriter::Int64(Int64Builder::with_capacity(capacity)),
+            DataType::Float(_) => 
TypedWriter::Float32(Float32Builder::with_capacity(capacity)),
+            DataType::Double(_) => 
TypedWriter::Float64(Float64Builder::with_capacity(capacity)),
+            DataType::Char(t) => TypedWriter::Char {
+                len: t.length() as usize,
+                builder: StringBuilder::with_capacity(
+                    capacity,
+                    capacity * VARIABLE_WIDTH_AVG_BYTES,
+                ),
+            },
+            DataType::String(_) => 
TypedWriter::String(StringBuilder::with_capacity(
+                capacity,
+                capacity * VARIABLE_WIDTH_AVG_BYTES,
+            )),
+            DataType::Bytes(_) => 
TypedWriter::Bytes(BinaryBuilder::with_capacity(
+                capacity,
+                capacity * VARIABLE_WIDTH_AVG_BYTES,
+            )),
+            DataType::Binary(t) => {
+                let arrow_len: i32 = t.length().try_into().map_err(|_| 
Error::IllegalArgument {
+                    message: format!(
+                        "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+                        t.length()
+                    ),
+                })?;
+                TypedWriter::Binary {
+                    len: t.length(),
+                    builder: FixedSizeBinaryBuilder::with_capacity(capacity, 
arrow_len),
+                }
+            }
+            DataType::Decimal(dt) => {
+                let (target_p, target_s) = match arrow_type {
+                    ArrowDataType::Decimal128(p, s) => (*p, *s),
+                    _ => {
+                        return Err(Error::IllegalArgument {
+                            message: format!(
+                                "Expected Decimal128 Arrow type for Decimal, 
got: {arrow_type:?}"
+                            ),
+                        });
+                    }
+                };
+                if target_s < 0 {
+                    return Err(Error::IllegalArgument {
+                        message: format!("Negative decimal scale {target_s} is 
not supported"),
+                    });
+                }
+                let builder = Decimal128Builder::with_capacity(capacity)
+                    .with_precision_and_scale(target_p, target_s)
+                    .map_err(|e| Error::IllegalArgument {
+                        message: format!(
+                            "Invalid decimal precision {target_p} or scale 
{target_s}: {e}"
+                        ),
+                    })?;
+                TypedWriter::Decimal128 {
+                    src_precision: dt.precision() as usize,
+                    src_scale: dt.scale() as usize,
+                    target_precision: target_p as u32,
+                    target_scale: target_s as i64,
+                    builder,
+                }
+            }
+            DataType::Date(_) => 
TypedWriter::Date32(Date32Builder::with_capacity(capacity)),
+            DataType::Time(_) => match arrow_type {
+                ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => {
+                    
TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity))
+                }
+                ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                    
TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity(
+                        capacity,
+                    ))
+                }
+                ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                    
TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(
+                        capacity,
+                    ))
+                }
+                ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+                    
TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity))
+                }
+                _ => {
+                    return Err(Error::IllegalArgument {
+                        message: format!("Unsupported Arrow type for Time: 
{arrow_type:?}"),
+                    });
+                }
+            },
+            DataType::Timestamp(t) => {
+                let precision = t.precision();
+                match arrow_type {
+                    ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
+                        TypedWriter::TimestampNtzSecond {
+                            precision,
+                            builder: 
TimestampSecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+                        TypedWriter::TimestampNtzMillisecond {
+                            precision,

Review Comment:
   The Timestamp/TimestampLTz writer selection matches 
`ArrowDataType::Timestamp(unit, _)`, accepting any timezone. Since the 
timestamp builders used here produce arrays with `timezone = None`, consider 
matching only `ArrowDataType::Timestamp(unit, None)` (or erroring on `Some(_)`) 
so schema mismatches fail fast during `ColumnWriter::create`.



##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed column writers that write directly from [`InternalRow`] to concrete
+//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime
+//! `downcast_mut` dispatch.
+
+use crate::error::Error::RowConvertError;
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::InternalRow;
+use crate::row::datum::{
+    MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, 
append_decimal_to_builder,
+    millis_nanos_to_micros, millis_nanos_to_nanos,
+};
+use arrow::array::{
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
+};
+use arrow_schema::DataType as ArrowDataType;
+
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
+/// A typed column writer that reads one column from an [`InternalRow`] and
+/// appends directly to a concrete Arrow builder — no intermediate [`Datum`],
+/// no `as_any_mut().downcast_mut()`.
+pub struct ColumnWriter {
+    pos: usize,
+    nullable: bool,
+    inner: TypedWriter,
+}
+
+enum TypedWriter {
+    Bool(BooleanBuilder),
+    Int8(Int8Builder),
+    Int16(Int16Builder),
+    Int32(Int32Builder),
+    Int64(Int64Builder),
+    Float32(Float32Builder),
+    Float64(Float64Builder),
+    Char {
+        len: usize,
+        builder: StringBuilder,
+    },
+    String(StringBuilder),
+    Bytes(BinaryBuilder),
+    Binary {
+        len: usize,
+        builder: FixedSizeBinaryBuilder,
+    },
+    Decimal128 {
+        src_precision: usize,
+        src_scale: usize,
+        target_precision: u32,
+        target_scale: i64,
+        builder: Decimal128Builder,
+    },
+    Date32(Date32Builder),
+    Time32Second(Time32SecondBuilder),
+    Time32Millisecond(Time32MillisecondBuilder),
+    Time64Microsecond(Time64MicrosecondBuilder),
+    Time64Nanosecond(Time64NanosecondBuilder),
+    TimestampNtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampNtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampNtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampNtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+    TimestampLtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampLtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampLtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampLtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+}
+
+impl ColumnWriter {
+    /// Create a column writer for the given Fluss `DataType` and Arrow
+    /// `ArrowDataType` at position `pos` with the given pre-allocation
+    /// `capacity`.
+    pub fn create(
+        fluss_type: &DataType,
+        arrow_type: &ArrowDataType,
+        pos: usize,
+        capacity: usize,
+    ) -> Result<Self> {

Review Comment:
   `ColumnWriter::create` takes both a Fluss `DataType` and an expected Arrow 
`ArrowDataType`, but for many variants the `arrow_type` parameter is ignored. 
Consider validating that `arrow_type` matches the Arrow type implied by 
`fluss_type` and returning an `IllegalArgument` on mismatch so callers fail 
fast instead of later via RecordBatch type checks.



##########
crates/fluss/src/row/column_writer.rs:
##########
@@ -0,0 +1,607 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed column writers that write directly from [`InternalRow`] to concrete
+//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime
+//! `downcast_mut` dispatch.
+
+use crate::error::Error::RowConvertError;
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::InternalRow;
+use crate::row::datum::{
+    MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, 
append_decimal_to_builder,
+    millis_nanos_to_micros, millis_nanos_to_nanos,
+};
+use arrow::array::{
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
+};
+use arrow_schema::DataType as ArrowDataType;
+
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
+/// A typed column writer that reads one column from an [`InternalRow`] and
+/// appends directly to a concrete Arrow builder — no intermediate [`Datum`],
+/// no `as_any_mut().downcast_mut()`.
+pub struct ColumnWriter {
+    pos: usize,
+    nullable: bool,
+    inner: TypedWriter,
+}
+
+enum TypedWriter {
+    Bool(BooleanBuilder),
+    Int8(Int8Builder),
+    Int16(Int16Builder),
+    Int32(Int32Builder),
+    Int64(Int64Builder),
+    Float32(Float32Builder),
+    Float64(Float64Builder),
+    Char {
+        len: usize,
+        builder: StringBuilder,
+    },
+    String(StringBuilder),
+    Bytes(BinaryBuilder),
+    Binary {
+        len: usize,
+        builder: FixedSizeBinaryBuilder,
+    },
+    Decimal128 {
+        src_precision: usize,
+        src_scale: usize,
+        target_precision: u32,
+        target_scale: i64,
+        builder: Decimal128Builder,
+    },
+    Date32(Date32Builder),
+    Time32Second(Time32SecondBuilder),
+    Time32Millisecond(Time32MillisecondBuilder),
+    Time64Microsecond(Time64MicrosecondBuilder),
+    Time64Nanosecond(Time64NanosecondBuilder),
+    TimestampNtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampNtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampNtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampNtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+    TimestampLtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampLtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampLtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampLtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+}
+
+impl ColumnWriter {
+    /// Create a column writer for the given Fluss `DataType` and Arrow
+    /// `ArrowDataType` at position `pos` with the given pre-allocation
+    /// `capacity`.
+    pub fn create(
+        fluss_type: &DataType,
+        arrow_type: &ArrowDataType,
+        pos: usize,
+        capacity: usize,
+    ) -> Result<Self> {
+        let nullable = fluss_type.is_nullable();
+
+        let inner = match fluss_type {
+            DataType::Boolean(_) => 
TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)),
+            DataType::TinyInt(_) => 
TypedWriter::Int8(Int8Builder::with_capacity(capacity)),
+            DataType::SmallInt(_) => 
TypedWriter::Int16(Int16Builder::with_capacity(capacity)),
+            DataType::Int(_) => 
TypedWriter::Int32(Int32Builder::with_capacity(capacity)),
+            DataType::BigInt(_) => 
TypedWriter::Int64(Int64Builder::with_capacity(capacity)),
+            DataType::Float(_) => 
TypedWriter::Float32(Float32Builder::with_capacity(capacity)),
+            DataType::Double(_) => 
TypedWriter::Float64(Float64Builder::with_capacity(capacity)),
+            DataType::Char(t) => TypedWriter::Char {
+                len: t.length() as usize,
+                builder: StringBuilder::with_capacity(
+                    capacity,
+                    capacity * VARIABLE_WIDTH_AVG_BYTES,
+                ),
+            },

Review Comment:
   `capacity * VARIABLE_WIDTH_AVG_BYTES` can overflow `usize` for large 
capacities (panic in debug / wrap in release), and `ColumnWriter::create` is 
public. Consider using `checked_mul` (return an `IllegalArgument` on overflow) 
or `saturating_mul` when computing the data buffer capacity for variable-width 
builders.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to