charlesdong1991 commented on code in PR #443:
URL: https://github.com/apache/fluss-rust/pull/443#discussion_r2962380550


##########
crates/fluss/src/record/arrow.rs:
##########
@@ -144,8 +148,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1;
 
 pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
 
-// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead 
of a hard record cap.
-pub const DEFAULT_MAX_RECORD: i32 = 256;
+/// Initial capacity for Arrow column vectors (pre-allocation hint, not a 
record cap).
+/// Matching Java's `ArrowWriter.INITIAL_CAPACITY`.
+const INITIAL_ROW_CAPACITY: usize = 1024;
+
+/// Fraction of the allocated buffer used as the effective write limit.
+/// Matching Java's `ArrowWriter.BUFFER_USAGE_RATIO`.
+const BUFFER_USAGE_RATIO: f64 = 0.95;

Review Comment:
   i think java uses float which should be f32? for ratio, maybe no need to use 
f64, and keep consistent with java behaviour?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -347,6 +384,11 @@ impl MemoryLogRecordsArrowBuilder {
             is_closed: false,
             arrow_record_batch_builder: arrow_batch_builder,
             arrow_compression_info,
+            write_limit: effective_limit,
+            ipc_overhead,
+            estimated_max_records_count: Cell::new(0),

Review Comment:
   Nit: functionally, i think the comparison in if logic will return the same 
result, but when i read through java side, it initiates as -1: 
https://github.com/apache/fluss/blob/0fa75ebf2147615b1abea1da7a4e7881d9044823/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java#L127
 



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -156,9 +165,24 @@ pub struct MemoryLogRecordsArrowBuilder {
     arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
     is_closed: bool,
     arrow_compression_info: ArrowCompressionInfo,
+    /// Effective write limit in bytes (after applying BUFFER_USAGE_RATIO).
+    write_limit: usize,
+    /// Pre-computed Arrow IPC overhead (metadata + body framing) for this 
schema.
+    /// Constant per schema+compression combination.
+    ipc_overhead: usize,
+    /// Estimated record count at which the next byte-size check should occur.
+    /// 0 means "unknown — check on the next append". Updated dynamically to
+    /// skip expensive `estimated_size_in_bytes()` calls on every append.
+    estimated_max_records_count: Cell<usize>,
+    /// Compression ratio estimator shared across batches for the same table.
+    compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
+    /// Snapshot of the compression ratio at batch creation time.
+    /// Matching Java's `ArrowWriter.estimatedCompressionRatio` which is
+    /// cached per batch and only refreshed on `reset()`.
+    estimated_compression_ratio: f32,
 }
 
-pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
+pub trait ArrowRecordBatchInnerBuilder: Send {

Review Comment:
   i wonder if there is a specific reason to remove Sync?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -451,12 +556,78 @@ impl MemoryLogRecordsArrowBuilder {
     }
 
     /// Get an estimate of the number of bytes written to the underlying 
buffer.
-    /// This includes the batch header size plus the estimated arrow data size.
+    /// Includes Fluss record batch header + Arrow IPC metadata + estimated
+    /// compressed body size.
     pub fn estimated_size_in_bytes(&self) -> usize {
-        RECORD_BATCH_HEADER_SIZE + 
self.arrow_record_batch_builder.estimated_size_in_bytes()
+        let body = self.arrow_record_batch_builder.estimated_size_in_bytes();
+        let estimated_body = self.estimated_compressed_size(body);
+        RECORD_BATCH_HEADER_SIZE + self.ipc_overhead + estimated_body
     }
 }
 
+/// Estimate the Arrow IPC overhead (metadata + body framing) for a given 
schema.
+///
+/// Serializes a 1-row RecordBatch with known data sizes, then subtracts the
+/// raw data contribution to isolate the fixed overhead: IPC message header,
+/// RecordBatch flatbuffer, and per-buffer alignment padding within the body.
+/// This overhead is constant for a given schema+compression combination.
+///
+/// Note: called once per batch creation. With writer pooling (see TODO above),
+/// this would be computed once per pooled writer and reused across batches.
+/// Analogous to Java's `ArrowUtils.estimateArrowMetadataLength()`.
+fn estimate_arrow_ipc_overhead(
+    schema: &SchemaRef,
+    compression: Option<CompressionType>,
+) -> Result<usize> {
+    use arrow::array::new_null_array;
+
+    // Create a 1-row batch of nulls. Null arrays have minimal, predictable
+    // data: no validity bitmap, no variable-length data, just fixed-width
+    // zero buffers. This lets us compute raw data size exactly.
+    let null_arrays: Vec<ArrayRef> = schema
+        .fields()
+        .iter()
+        .map(|field| new_null_array(field.data_type(), 1))
+        .collect();
+    let batch = RecordBatch::try_new(schema.clone(), null_arrays)?;
+
+    // Sum the raw buffer sizes — this is what buffer_size() would report.
+    let raw_data: usize = batch
+        .columns()
+        .iter()
+        .map(|col| {
+            col.to_data()
+                .buffers()
+                .iter()
+                .map(|buf| round_up_to_8(buf.len()))
+                .sum::<usize>()
+                // Validity buffer (null bitmap)
+                + col
+                    .nulls()
+                    .map_or(0, |n| round_up_to_8(n.buffer().len()))
+        })
+        .sum();
+
+    // Serialize the batch via IPC and measure total output.
+    let mut buf = vec![];
+    let write_option =
+        IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), 
compression);
+    let mut writer = StreamWriter::try_new_with_options(&mut buf, schema, 
write_option?)?;
+    let header_len = writer.get_ref().len();
+    writer.write(&batch)?;
+    let total_len = writer.get_ref().len();
+
+    // IPC overhead = total message size - raw data we put in.
+    let ipc_message_len = total_len - header_len;
+    Ok(ipc_message_len.saturating_sub(raw_data))
+}
+
+/// Round up to the next multiple of 8 (Arrow IPC alignment).
+#[inline]
+fn round_up_to_8(n: usize) -> usize {

Review Comment:
   Nit: this is very minor, i think there is another fn called `round_up_8` 
defined in column_writer.rs, can we maybe have a shared one, or rename to the 
same? 😉 



##########
crates/fluss/src/client/write/batch.rs:
##########
@@ -587,4 +595,133 @@ mod tests {
             "estimated size {estimated_size} is not equal to actual size"
         );
     }
+
+    /// Appends rows until `is_full()` triggers, then builds the batch and
+    /// verifies the actual serialized size stays within the configured limit.
+    #[test]
+    fn test_arrow_batch_actual_size_within_limit() {

Review Comment:
   Minor: can we add a test (maybe inside this one) that verifies the feedback 
loop as well? aka, for first time of batch with compression will build and 
update shared estimator; after build, estimator should have changed ( below 1) 
and second batch with same estimator should allow more records.
   
   is my understanding correct?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -310,26 +334,34 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
     }
 
     fn is_full(&self) -> bool {
-        self.records_count() >= DEFAULT_MAX_RECORD
+        // Size-based fullness is handled by MemoryLogRecordsArrowBuilder,
+        // which accounts for metadata length and compression ratio.
+        false
     }
 
     fn estimated_size_in_bytes(&self) -> usize {
-        // Returns the uncompressed Arrow array memory size (same as Java's 
arrowWriter.estimatedSizeInBytes()).
-        // Note: This is the size before compression. After build(), the 
actual size may be smaller
-        // if compression is enabled.
-        self.column_writers
-            .iter()
-            .map(|writer| writer.finish_cloned().get_array_memory_size())
-            .sum()
+        // Returns the uncompressed Arrow IPC body size by reading buffer 
lengths
+        // directly from the builders — O(num_columns), zero allocation.
+        // Analogous to Java's `ArrowUtils.estimateArrowBodyLength()`.
+        // Java reads exact IPC buffer sizes from vectors; we read builder
+        // buffer lengths. The IPC framing overhead is accounted for
+        // separately by `ipc_overhead`.
+        self.column_writers.iter().map(|w| w.buffer_size()).sum()
     }
 }
 
+// TODO: Pool and reuse MemoryLogRecordsArrowBuilder instances per 
table/schema like
+// Java's ArrowWriterPool. Reused writers can seed 
`estimated_max_records_count` from
+// the previous batch (recordsCount / 2) for a warm start, avoiding the 
first-record
+// size check on every new batch.

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to