This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch benchmarks_0_6
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d69736575b23b9d709465aa0f0cf1b665de4ae53
Author: numinex <[email protected]>
AuthorDate: Wed Feb 25 11:54:14 2026 +0100

    improvements
---
 core/server/src/streaming/segments/messages/mod.rs | 63 +++++++++++-----------
 1 file changed, 33 insertions(+), 30 deletions(-)

diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index 8f5ba9c81..46ec05d42 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -22,16 +22,12 @@ mod messages_writer;
 use super::IggyMessagesBatchSet;
 use bytes::Bytes;
 use compio::{fs::File, io::AsyncWriteAtExt};
+use futures::future::join_all;
 use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer};
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
-/// Maximum number of IO vectors for a single writev() call.
-/// Linux typically has IOV_MAX=1024, but we use a conservative value to ensure
-/// cross-platform compatibility and leave room for any internal overhead.
-const MAX_IOV_COUNT: usize = 1024;
-
 /// Vectored write a batches of messages to file
 async fn write_batch(
     file: &File,
@@ -47,7 +43,7 @@ async fn write_batch(
                 (size + batch_size, bufs)
             });
 
-    write_vectored_chunked_pooled(file, position, buffers).await?;
+    write_parallel_pooled(file, position, buffers).await?;
     Ok(total_written)
 }
 
@@ -69,47 +65,54 @@ pub async fn write_batch_frozen(
         },
     );
 
-    write_vectored_chunked_bytes(file, position, buffers).await?;
+    write_parallel_bytes(file, position, buffers).await?;
     Ok(total_written)
 }
 
-/// Writes PooledBuffer buffers to file using vectored I/O, chunking to 
respect IOV_MAX limits.
-async fn write_vectored_chunked_pooled(
+/// Writes PooledBuffer buffers to file in parallel using positional writes.
+async fn write_parallel_pooled(
     file: &File,
-    mut position: u64,
+    position: u64,
     buffers: Vec<PooledBuffer>,
 ) -> Result<(), IggyError> {
-    let mut iter = buffers.into_iter().peekable();
-
-    while iter.peek().is_some() {
-        let chunk: Vec<PooledBuffer> = 
iter.by_ref().take(MAX_IOV_COUNT).collect();
-        let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
+    let mut next_position = position;
+    let futures = buffers.into_iter().map(|buffer| {
+        let write_position = next_position;
+        next_position += buffer.len() as u64;
 
-        let (result, _) = (&*file).write_vectored_all_at(chunk, 
position).await.into();
-        result.map_err(|_| IggyError::CannotWriteToFile)?;
+        async move {
+            let (result, _) = (&*file).write_all_at(buffer, 
write_position).await.into();
+            result.map_err(|_| IggyError::CannotWriteToFile)
+        }
+    });
 
-        position += chunk_size as u64;
+    for result in join_all(futures).await {
+        result?;
     }
+
     Ok(())
 }
 
-/// Writes Bytes buffers to file using vectored I/O, chunking to respect 
IOV_MAX limits.
-async fn write_vectored_chunked_bytes(
+/// Writes Bytes buffers to file in parallel using positional writes.
+async fn write_parallel_bytes(
     file: &File,
-    mut position: u64,
+    position: u64,
     buffers: Vec<Bytes>,
 ) -> Result<(), IggyError> {
-    for chunk in buffers.chunks(MAX_IOV_COUNT) {
-        let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
-        let chunk_vec: Vec<Bytes> = chunk.to_vec();
+    let mut next_position = position;
+    let futures = buffers.into_iter().map(|buffer| {
+        let write_position = next_position;
+        next_position += buffer.len() as u64;
 
-        let (result, _) = (&*file)
-            .write_vectored_all_at(chunk_vec, position)
-            .await
-            .into();
-        result.map_err(|_| IggyError::CannotWriteToFile)?;
+        async move {
+            let (result, _) = (&*file).write_all_at(buffer, 
write_position).await.into();
+            result.map_err(|_| IggyError::CannotWriteToFile)
+        }
+    });
 
-        position += chunk_size as u64;
+    for result in join_all(futures).await {
+        result?;
     }
+
     Ok(())
 }

Reply via email to