leekeiabstraction commented on code in PR #442:
URL: https://github.com/apache/fluss-rust/pull/442#discussion_r2942537008


##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -237,6 +238,26 @@ impl InnerValueWriter {
             (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
                 writer.write_timestamp_ltz(ts, *p);
             }
+            (InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => {

Review Comment:
   Why don't we delegate like on Java side?
   
   
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java#L176



##########
crates/fluss/src/row/column.rs:
##########
@@ -533,4 +715,112 @@ mod tests {
             .unwrap()
         );
     }
+
+    fn make_struct_batch(

Review Comment:
   Is this for test only? If so, move under '#[test]'/mod?



##########
crates/fluss/src/row/column.rs:
##########
@@ -533,4 +715,112 @@ mod tests {
             .unwrap()
         );
     }
+
+    fn make_struct_batch(
+        field_name: &str,
+        child_fields: Fields,
+        child_arrays: Vec<Arc<dyn Array>>,
+        _num_rows: usize,
+    ) -> Arc<RecordBatch> {
+        let struct_array = StructArray::new(child_fields.clone(), 
child_arrays, None);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            field_name,
+            DataType::Struct(child_fields),
+            false,
+        )]));
+        Arc::new(
+            RecordBatch::try_new(schema, vec![Arc::new(struct_array)])
+                .expect("record batch"),
+        )
+    }
+
+    #[test]
+    fn columnar_row_reads_nested_row() {
+        // Build a RecordBatch with a Struct column: {i32, string}
+        let child_fields = Fields::from(vec![
+            Field::new("x", DataType::Int32, false),
+            Field::new("s", DataType::Utf8, false),
+        ]);
+        let child_arrays: Vec<Arc<dyn Array>> = vec![
+            Arc::new(Int32Array::from(vec![42, 99])),
+            Arc::new(StringArray::from(vec!["hello", "world"])),
+        ];
+        let batch = make_struct_batch("nested", child_fields, child_arrays, 2);
+
+        let mut row = ColumnarRow::new(batch);
+
+        // row_id = 0
+        let nested = row.get_row(0).unwrap();
+        assert_eq!(nested.get_field_count(), 2);
+        assert_eq!(nested.get_int(0).unwrap(), 42);
+        assert_eq!(nested.get_string(1).unwrap(), "hello");
+
+        // row_id = 1
+        row.set_row_id(1);
+        let nested = row.get_row(0).unwrap();
+        assert_eq!(nested.get_int(0).unwrap(), 99);
+        assert_eq!(nested.get_string(1).unwrap(), "world");
+    }
+
+    #[test]
+    fn columnar_row_reads_deeply_nested_row() {
+        // Build: outer struct { i32, inner struct { string } }
+        let inner_fields = Fields::from(vec![Field::new("s", DataType::Utf8, 
false)]);
+        let inner_array = Arc::new(StructArray::new(
+            inner_fields.clone(),
+            vec![Arc::new(StringArray::from(vec!["deep", "deeper"])) as 
Arc<dyn Array>],
+            None,
+        ));
+
+        let outer_fields = Fields::from(vec![
+            Field::new("n", DataType::Int32, false),
+            Field::new("inner", DataType::Struct(inner_fields), false),
+        ]);
+        let outer_array = Arc::new(StructArray::new(
+            outer_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2])) as Arc<dyn Array>,
+                inner_array as Arc<dyn Array>,
+            ],
+            None,
+        ));
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "outer",
+            DataType::Struct(outer_fields),
+            false,
+        )]));
+        let batch = Arc::new(
+            RecordBatch::try_new(schema, vec![outer_array]).expect("record 
batch"),
+        );
+
+        let row = ColumnarRow::new(batch);
+
+        // Access outer struct at column 0, row 0
+        let outer = row.get_row(0).unwrap();
+        assert_eq!(outer.get_int(0).unwrap(), 1);

Review Comment:
   Not: should we assert second row as well?



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -160,6 +160,18 @@ impl<'a> CompactedRowDeserializer<'a> {
                         (Datum::TimestampLtz(timestamp_ltz), next)
                     }
                 }
+                DataType::Row(row_type) => {
+                    let (nested_bytes, next) = reader.read_bytes(cursor);
+                    let nested_reader = CompactedRowReader::new(
+                        row_type.fields().len(),
+                        nested_bytes,
+                        0,
+                        nested_bytes.len(),
+                    );
+                    let nested_deser = 
CompactedRowDeserializer::new_from_owned(row_type.clone());

Review Comment:
   Can we use from 'new' which borrows instead? Seems like nested_deser does 
not live beyond current scope anyway.



##########
crates/fluss/src/row/column.rs:
##########
@@ -209,6 +217,168 @@ impl ColumnarRow {
             }),
         }
     }
+
+    /// Extract a `GenericRow<'static>` from a column in the RecordBatch at 
the given row_id.
+    fn extract_struct_at(
+        batch: &RecordBatch,
+        pos: usize,
+        row_id: usize,
+    ) -> Result<GenericRow<'static>> {
+        let col = batch.column(pos);
+        Self::extract_struct_from_array(col.as_ref(), row_id)
+    }
+
+    /// Recursively extract a `GenericRow<'static>` from a `StructArray` at 
row_id.
+    fn extract_struct_from_array(array: &dyn Array, row_id: usize) -> 
Result<GenericRow<'static>> {
+        use arrow::array::StructArray;
+        let sa = array
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .ok_or_else(|| IllegalArgument {
+                message: format!("expected StructArray, got {:?}", 
array.data_type()),
+            })?;
+        let mut values = Vec::with_capacity(sa.num_columns());
+        for i in 0..sa.num_columns() {
+            let child = sa.column(i);
+            values.push(Self::arrow_value_to_datum(child.as_ref(), row_id)?);
+        }
+        Ok(GenericRow { values })
+    }
+
+    /// Convert a single element at `row_id` in an Arrow array to a 
`Datum<'static>`.
+    fn arrow_value_to_datum(array: &dyn Array, row_id: usize) -> 
Result<Datum<'static>> {
+        use arrow::array::{
+            BooleanArray, Decimal128Array, Float32Array, Float64Array, 
Int8Array, Int16Array,
+            Int32Array, Int64Array, Time32MillisecondArray, Time32SecondArray,
+            Time64MicrosecondArray, Time64NanosecondArray, 
TimestampMicrosecondArray,
+            TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
+        };
+        use crate::row::Decimal;
+
+        if array.is_null(row_id) {
+            return Ok(Datum::Null);
+        }
+
+        match array.data_type() {
+            ArrowDataType::Boolean => {
+                let a = array.as_any().downcast_ref::<BooleanArray>().unwrap();

Review Comment:
   Let's error with appropriate message instead of unwrapping/panic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to