fresh-borzoni commented on code in PR #474:
URL: https://github.com/apache/fluss-rust/pull/474#discussion_r3025476824
##########
bindings/python/src/lib.rs:
##########
@@ -130,6 +130,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<WriteResultHandle>()?;
m.add_class::<DatabaseDescriptor>()?;
m.add_class::<DatabaseInfo>()?;
+ m.add_class::<DataType>()?;
Review Comment:
.pyi stubs?
##########
crates/fluss/src/row/column.rs:
##########
@@ -607,29 +615,50 @@ fn write_arrow_values_to_fluss_array(
)?;
}
DataType::Array(_) => {
- let list_arr =
Review Comment:
Downcast and from_arrow_type were hoisted before this PR, now they're
per-element. The type doesn't change across iterations
##########
bindings/python/src/metadata.rs:
##########
@@ -765,3 +766,116 @@ impl DatabaseInfo {
Self { __info: info }
}
}
+
+/// Represents a Fluss data type
+#[pyclass]
+#[derive(Clone)]
+pub struct DataType {
Review Comment:
Is it even used aside from tests?
##########
crates/fluss/src/row/binary_array.rs:
##########
@@ -621,6 +621,79 @@ impl FlussArrayWriter {
}
}
+impl crate::row::InternalRow for FlussArray {
+ fn get_field_count(&self) -> usize {
+ self.size()
+ }
+
+ fn is_null_at(&self, pos: usize) -> Result<bool> {
+ Ok(self.is_null_at(pos))
+ }
+
+ fn get_boolean(&self, pos: usize) -> Result<bool> {
+ self.get_boolean(pos)
+ }
+ fn get_byte(&self, pos: usize) -> Result<i8> {
+ self.get_byte(pos)
+ }
+ fn get_short(&self, pos: usize) -> Result<i16> {
+ self.get_short(pos)
+ }
+ fn get_int(&self, pos: usize) -> Result<i32> {
+ self.get_int(pos)
+ }
+ fn get_long(&self, pos: usize) -> Result<i64> {
+ self.get_long(pos)
+ }
+ fn get_float(&self, pos: usize) -> Result<f32> {
+ self.get_float(pos)
+ }
+ fn get_double(&self, pos: usize) -> Result<f64> {
+ self.get_double(pos)
+ }
+
+ fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
+ self.get_string(pos)
+ }
+
+ fn get_string(&self, pos: usize) -> Result<&str> {
+ self.get_string(pos)
+ }
+
+ fn get_decimal(&self, pos: usize, precision: usize, scale: usize) ->
Result<Decimal> {
+ self.get_decimal(pos, precision as u32, scale as u32)
+ }
+
+ fn get_date(&self, pos: usize) -> Result<Date> {
+ self.get_date(pos)
+ }
+ fn get_time(&self, pos: usize) -> Result<Time> {
+ self.get_time(pos)
+ }
+ fn get_timestamp_ntz(&self, pos: usize, precision: u32) ->
Result<TimestampNtz> {
+ self.get_timestamp_ntz(pos, precision)
+ }
+ fn get_timestamp_ltz(&self, pos: usize, precision: u32) ->
Result<TimestampLtz> {
+ self.get_timestamp_ltz(pos, precision)
+ }
+
+ fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
+ self.get_binary(pos)
+ }
+
+ fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
+ self.get_binary(pos)
+ }
+
+ fn get_array(&self, pos: usize) -> Result<FlussArray> {
+ self.get_array(pos)
+ }
+
+ fn as_encoded_bytes(&self, _write_format: crate::client::WriteFormat) ->
Option<&[u8]> {
+ Some(self.as_bytes())
Review Comment:
I don't think it's encoded
##########
bindings/python/src/table.rs:
##########
@@ -1370,6 +1370,20 @@ pub fn datum_to_python_value(
.map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ltz_to_python(py, ts)
}
+ DataType::Array(array_type) => {
Review Comment:
what about python_value_to_datum?
##########
bindings/python/test/test_log_table.py:
##########
@@ -755,3 +756,98 @@ def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
if arrow_table.num_rows > 0:
all_ids.extend(arrow_table.column("id").to_pylist())
return all_ids
+
+
+async def test_append_and_scan_with_array(connection, admin):
+ """Test appending and scanning with array columns."""
+ table_path = fluss.TablePath("fluss", "py_test_append_and_scan_with_array")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("tags", pa.list_(pa.string())),
+ pa.field("scores", pa.list_(pa.int32())),
+ ]
+ )
+ schema = fluss.Schema(pa_schema)
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ # Batch 1: Testing both standard and large lists
+ batch1 = pa.RecordBatch.from_arrays(
+ [
+ pa.array([1, 2], type=pa.int32()),
+ pa.array([["a", "b"], ["c"]], type=pa.list_(pa.string())),
+ pa.array([[10, 20], [30]], type=pa.list_(pa.int32())),
+ ],
+ schema=pa_schema,
+ )
+ append_writer.write_arrow_batch(batch1)
+ await append_writer.flush()
+
+ # Verify via LogScanner (record-by-record)
+ scanner = await table.new_scan().create_log_scanner()
+ scanner.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
+ records = _poll_records(scanner, expected_count=2)
+
+ assert len(records) == 2
+ records.sort(key=lambda r: r.row["id"])
+
+ assert records[0].row["tags"] == ["a", "b"]
+ assert records[0].row["scores"] == [10, 20]
+ assert records[1].row["tags"] == ["c"]
+ assert records[1].row["scores"] == [30]
+
+ # Verify via to_arrow (batch-based)
+ scanner2 = await table.new_scan().create_record_batch_log_scanner()
+ scanner2.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
+ result_table = scanner2.to_arrow()
+
+ assert result_table.num_rows == 2
+ assert result_table.column("tags").to_pylist() == [["a", "b"], ["c"]]
+ assert result_table.column("scores").to_pylist() == [[10, 20], [30]]
+
[email protected](reason="FixedSizeList support requires server-side updates
(≥0.9.1). "
Review Comment:
Why do we skip this?
Server in fluss main repo accepts only ListVector, so it will fail right
now, I only see FixedSizeList in Lance tiering part
##########
bindings/python/test/test_log_table.py:
##########
@@ -755,3 +756,98 @@ def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
if arrow_table.num_rows > 0:
all_ids.extend(arrow_table.column("id").to_pylist())
return all_ids
+
+
+async def test_append_and_scan_with_array(connection, admin):
+ """Test appending and scanning with array columns."""
+ table_path = fluss.TablePath("fluss", "py_test_append_and_scan_with_array")
+ await admin.drop_table(table_path, ignore_if_not_exists=True)
+
+ pa_schema = pa.schema(
+ [
+ pa.field("id", pa.int32()),
+ pa.field("tags", pa.list_(pa.string())),
+ pa.field("scores", pa.list_(pa.int32())),
+ ]
+ )
+ schema = fluss.Schema(pa_schema)
+ table_descriptor = fluss.TableDescriptor(schema)
+ await admin.create_table(table_path, table_descriptor,
ignore_if_exists=False)
+
+ table = await connection.get_table(table_path)
+ append_writer = table.new_append().create_writer()
+
+ # Batch 1: Testing both standard and large lists
+ batch1 = pa.RecordBatch.from_arrays(
Review Comment:
shall we tests null values inside arrays as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]