This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch benchmarks_0_6 in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d69736575b23b9d709465aa0f0cf1b665de4ae53 Author: numinex <[email protected]> AuthorDate: Wed Feb 25 11:54:14 2026 +0100 improvements --- core/server/src/streaming/segments/messages/mod.rs | 63 +++++++++++----------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index 8f5ba9c81..46ec05d42 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -22,16 +22,12 @@ mod messages_writer; use super::IggyMessagesBatchSet; use bytes::Bytes; use compio::{fs::File, io::AsyncWriteAtExt}; +use futures::future::join_all; use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer}; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; -/// Maximum number of IO vectors for a single writev() call. -/// Linux typically has IOV_MAX=1024, but we use a conservative value to ensure -/// cross-platform compatibility and leave room for any internal overhead. -const MAX_IOV_COUNT: usize = 1024; - /// Vectored write a batches of messages to file async fn write_batch( file: &File, @@ -47,7 +43,7 @@ async fn write_batch( (size + batch_size, bufs) }); - write_vectored_chunked_pooled(file, position, buffers).await?; + write_parallel_pooled(file, position, buffers).await?; Ok(total_written) } @@ -69,47 +65,54 @@ pub async fn write_batch_frozen( }, ); - write_vectored_chunked_bytes(file, position, buffers).await?; + write_parallel_bytes(file, position, buffers).await?; Ok(total_written) } -/// Writes PooledBuffer buffers to file using vectored I/O, chunking to respect IOV_MAX limits. -async fn write_vectored_chunked_pooled( +/// Writes PooledBuffer buffers to file in parallel using positional writes. +async fn write_parallel_pooled( file: &File, - mut position: u64, + position: u64, buffers: Vec<PooledBuffer>, ) -> Result<(), IggyError> { - let mut iter = buffers.into_iter().peekable(); - - while iter.peek().is_some() { - let chunk: Vec<PooledBuffer> = iter.by_ref().take(MAX_IOV_COUNT).collect(); - let chunk_size: usize = chunk.iter().map(|b| b.len()).sum(); + let mut next_position = position; + let futures = buffers.into_iter().map(|buffer| { + let write_position = next_position; + next_position += buffer.len() as u64; - let (result, _) = (&*file).write_vectored_all_at(chunk, position).await.into(); - result.map_err(|_| IggyError::CannotWriteToFile)?; + async move { + let (result, _) = (&*file).write_all_at(buffer, write_position).await.into(); + result.map_err(|_| IggyError::CannotWriteToFile) + } + }); - position += chunk_size as u64; + for result in join_all(futures).await { + result?; } + Ok(()) } -/// Writes Bytes buffers to file using vectored I/O, chunking to respect IOV_MAX limits. -async fn write_vectored_chunked_bytes( +/// Writes Bytes buffers to file in parallel using positional writes. +async fn write_parallel_bytes( file: &File, - mut position: u64, + position: u64, buffers: Vec<Bytes>, ) -> Result<(), IggyError> { - for chunk in buffers.chunks(MAX_IOV_COUNT) { - let chunk_size: usize = chunk.iter().map(|b| b.len()).sum(); - let chunk_vec: Vec<Bytes> = chunk.to_vec(); + let mut next_position = position; + let futures = buffers.into_iter().map(|buffer| { + let write_position = next_position; + next_position += buffer.len() as u64; - let (result, _) = (&*file) - .write_vectored_all_at(chunk_vec, position) - .await - .into(); - result.map_err(|_| IggyError::CannotWriteToFile)?; + async move { + let (result, _) = (&*file).write_all_at(buffer, write_position).await.into(); + result.map_err(|_| IggyError::CannotWriteToFile) + } + }); - position += chunk_size as u64; + for result in join_all(futures).await { + result?; } + Ok(()) }
