fresh-borzoni commented on code in PR #443:
URL: https://github.com/apache/fluss-rust/pull/443#discussion_r2963452329


##########
crates/fluss/src/record/arrow.rs:
##########
@@ -451,12 +556,78 @@ impl MemoryLogRecordsArrowBuilder {
     }
 
     /// Get an estimate of the number of bytes written to the underlying 
buffer.
-    /// This includes the batch header size plus the estimated arrow data size.
+    /// Includes Fluss record batch header + Arrow IPC metadata + estimated
+    /// compressed body size.
     pub fn estimated_size_in_bytes(&self) -> usize {
-        RECORD_BATCH_HEADER_SIZE + 
self.arrow_record_batch_builder.estimated_size_in_bytes()
+        let body = self.arrow_record_batch_builder.estimated_size_in_bytes();
+        let estimated_body = self.estimated_compressed_size(body);
+        RECORD_BATCH_HEADER_SIZE + self.ipc_overhead + estimated_body
     }
 }
 
+/// Estimate the Arrow IPC overhead (metadata + body framing) for a given 
schema.
+///
+/// Serializes a 1-row RecordBatch with known data sizes, then subtracts the
+/// raw data contribution to isolate the fixed overhead: IPC message header,
+/// RecordBatch flatbuffer, and per-buffer alignment padding within the body.
+/// This overhead is constant for a given schema+compression combination.
+///
+/// Note: called once per batch creation. With writer pooling (see TODO above),
+/// this would be computed once per pooled writer and reused across batches.
+/// Analogous to Java's `ArrowUtils.estimateArrowMetadataLength()`.
+fn estimate_arrow_ipc_overhead(
+    schema: &SchemaRef,
+    compression: Option<CompressionType>,
+) -> Result<usize> {
+    use arrow::array::new_null_array;
+
+    // Create a 1-row batch of nulls. Null arrays have minimal, predictable
+    // data: no validity bitmap, no variable-length data, just fixed-width
+    // zero buffers. This lets us compute raw data size exactly.
+    let null_arrays: Vec<ArrayRef> = schema
+        .fields()
+        .iter()
+        .map(|field| new_null_array(field.data_type(), 1))
+        .collect();
+    let batch = RecordBatch::try_new(schema.clone(), null_arrays)?;
+
+    // Sum the raw buffer sizes — this is what buffer_size() would report.
+    let raw_data: usize = batch
+        .columns()
+        .iter()
+        .map(|col| {
+            col.to_data()
+                .buffers()
+                .iter()
+                .map(|buf| round_up_to_8(buf.len()))
+                .sum::<usize>()
+                // Validity buffer (null bitmap)
+                + col
+                    .nulls()
+                    .map_or(0, |n| round_up_to_8(n.buffer().len()))
+        })
+        .sum();
+
+    // Serialize the batch via IPC and measure total output.
+    let mut buf = vec![];
+    let write_option =
+        IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), 
compression);
+    let mut writer = StreamWriter::try_new_with_options(&mut buf, schema, 
write_option?)?;
+    let header_len = writer.get_ref().len();
+    writer.write(&batch)?;
+    let total_len = writer.get_ref().len();
+
+    // IPC overhead = total message size - raw data we put in.
+    let ipc_message_len = total_len - header_len;
+    Ok(ipc_message_len.saturating_sub(raw_data))
+}
+
+/// Round up to the next multiple of 8 (Arrow IPC alignment).
+#[inline]
+fn round_up_to_8(n: usize) -> usize {

Review Comment:
   Changed to use the function from column_writer, good spot 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to