This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partitions_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2fd6bc0ffdfb57a3832f40c481a1a9991bb85f25
Author: numinex <[email protected]>
AuthorDate: Mon Mar 9 21:00:50 2026 +0100

    smth
---
 core/journal/src/lib.rs        |   5 +-
 core/partitions/src/journal.rs | 220 ++++++++++++++++++++++++++++++-----------
 core/simulator/src/deps.rs     |   6 +-
 3 files changed, 171 insertions(+), 60 deletions(-)

diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index f1b4081dd..55a6b14fb 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -39,7 +39,10 @@ pub trait Storage {
     type Buffer;
 
     fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
-    fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output 
= Self::Buffer>;
+    // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` 
in order to support that.
+    // Maybe we should go back to passing in the `Buffer` again, but I am not 
sure how to handle it in the `Partitions Journal`, since we use in-memory impl
+    // which extracts the buffer out of the `Vec<Message>` and we don't need 
to allocate a new buffer.
+    fn read(&self, offset: usize, len: usize) -> impl Future<Output = 
Self::Buffer>;
 }
 
 pub trait JournalHandle {
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 5f964b46b..93d3733fc 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,14 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::BytesMut;
+use bytes::{Bytes, BytesMut};
 use iggy_common::{
     INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
PooledBuffer,
     header::{Operation, PrepareHeader},
     message::Message,
 };
 use journal::{Journal, Storage};
-use std::{cell::UnsafeCell, collections::HashMap};
+use std::{
+    cell::UnsafeCell,
+    collections::{BTreeMap, HashMap},
+};
+
+const ZERO_LEN: usize = 0;
 
 // TODO: Fix that, we need to figure out how to store the 
`IggyMessagesBatchSet`.
 /// No-op storage backend for the in-memory partition journal.
@@ -36,7 +41,7 @@ impl Storage for Noop {
         0
     }
 
-    async fn read(&self, _offset: usize, _buffer: ()) -> () { ()}
+    async fn read(&self, _offset: usize, _len: usize) -> () {}
 }
 
 /// Lookup key for querying messages from the journal.
@@ -99,29 +104,69 @@ where
     fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
 }
 
-pub struct PartitionJournal2Impl {
-    message_offset_to_op: UnsafeCell<HashMap<u64, usize>>,
-    timestamp_to_op: UnsafeCell<HashMap<u64, usize>>,
-    inner: UnsafeCell<JournalInner>,
+#[derive(Default)]
+pub struct PartitionJournalMemStorage {
+    entries: UnsafeCell<Vec<Bytes>>,
+    op_to_index: UnsafeCell<HashMap<u64, usize>>,
 }
 
-struct JournalInner {
-    set: Vec<Message<PrepareHeader>>,
-}
+impl Storage for PartitionJournalMemStorage {
+    type Buffer = Bytes;
 
-impl Default for PartitionJournal2Impl {
-    fn default() -> Self {
-        Self {
-            message_offset_to_op: UnsafeCell::new(HashMap::new()),
-            timestamp_to_op: UnsafeCell::new(HashMap::new()),
-            inner: UnsafeCell::new(JournalInner { set: Vec::new() }),
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let op = Message::<PrepareHeader>::from_bytes(buf.clone())
+            .ok()
+            .map(|message| message.header().op);
+
+        let entries = unsafe { &mut *self.entries.get() };
+        let index = entries.len();
+        entries.push(buf.clone());
+
+        if let Some(op) = op {
+            let op_to_index = unsafe { &mut *self.op_to_index.get() };
+            op_to_index.insert(op, index);
         }
+
+        buf.len()
+    }
+
+    async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+        let op = offset as u64;
+        let Some(index) = ({
+            let op_to_index = unsafe { &*self.op_to_index.get() };
+            op_to_index.get(&op).copied()
+        }) else {
+            return Bytes::new();
+        };
+
+        let entries = unsafe { &*self.entries.get() };
+        entries.get(index).cloned().unwrap_or_default()
     }
 }
 
-impl PartitionJournal2Impl {
+pub struct PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    headers: UnsafeCell<Vec<PrepareHeader>>,
+    inner: UnsafeCell<JournalInner<S>>,
+}
+
+struct JournalInner<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    storage: S,
+}
+
+impl<S> PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
     fn decode_send_messages_batch(body: bytes::Bytes) -> 
Option<IggyMessagesBatchMut> {
-        // TODO: This is bad, 
+        // TODO: This is bad,
         let mut body = body
             .try_into_mut()
             .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
@@ -155,10 +200,10 @@ impl PartitionJournal2Impl {
         Self::decode_send_messages_batch(message.body_bytes())
     }
 
-    fn messages_to_batch_set<'a>(messages: impl Iterator<Item = &'a 
Message<PrepareHeader>>) -> IggyMessagesBatchSet {
+    fn messages_to_batch_set(messages: Vec<Message<PrepareHeader>>) -> 
IggyMessagesBatchSet {
         let mut batch_set = IggyMessagesBatchSet::empty();
 
-        for message in messages {
+        for message in &messages {
             if let Some(batch) = Self::message_to_batch(message) {
                 batch_set.add_batch(batch);
             }
@@ -167,50 +212,108 @@ impl PartitionJournal2Impl {
         batch_set
     }
 
-    fn candidate_start_op(&self, query: &MessageLookup) -> usize {
+    fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
         match query {
             MessageLookup::Offset { offset, .. } => {
                 let offsets = unsafe { &*self.message_offset_to_op.get() };
-                offsets.get(offset).copied().unwrap_or_default()
+                offsets
+                    .range(..=*offset)
+                    .next_back()
+                    .or_else(|| offsets.range(*offset..).next())
+                    .map(|(_, op)| *op)
             }
             MessageLookup::Timestamp { timestamp, .. } => {
                 let timestamps = unsafe { &*self.timestamp_to_op.get() };
-                timestamps.get(timestamp).copied().unwrap_or_default()
+                timestamps
+                    .range(..=*timestamp)
+                    .next_back()
+                    .or_else(|| timestamps.range(*timestamp..).next())
+                    .map(|(_, op)| *op)
             }
         }
     }
 
-    fn messages_from_op(&self, start_op: usize) -> impl Iterator<Item = 
&Message<PrepareHeader>> {
-        let inner = unsafe { &*self.inner.get() };
-        inner.set.iter().skip(start_op)
+    async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
+        let offset = usize::try_from(op).ok()?;
+        let bytes = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.read(offset, ZERO_LEN).await
+        };
+
+        Some(Message::from_bytes(bytes).expect("invalid message bytes read 
from storage"))
+    }
+
+    async fn load_messages_from_storage(
+        &self,
+        start_op: u64,
+        count: u32,
+    ) -> Vec<Message<PrepareHeader>> {
+        if count == 0 {
+            return Vec::new();
+        }
+
+        let mut messages = Vec::new();
+        let mut loaded_messages = 0u32;
+        let mut op = start_op;
+
+        while loaded_messages < count {
+            let Some(message) = self.message_by_op(op).await else {
+                break;
+            };
+
+            if let Some(batch) = Self::message_to_batch(&message) {
+                loaded_messages = 
loaded_messages.saturating_add(batch.count());
+                messages.push(message);
+            }
+
+            op += 1;
+        }
+
+        messages
     }
 }
 
-impl Journal<Noop> for PartitionJournal2Impl {
+impl<S> Journal<S> for PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
     type Header = PrepareHeader;
     type Entry = Message<Self::Header>;
-    type HeaderRef<'a> = &'a Self::Header;
+    #[rustfmt::skip] // Scuffed formatter.
+    type HeaderRef<'a> = &'a Self::Header where S: 'a;
 
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
-        // TODO: Fixes
-        let inner = unsafe { &*self.inner.get() };
-        inner.set.get(idx).map(|msg| msg.header())
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.get(idx)
     }
 
     fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
-        // TODO: Fixes
-        let prev_idx = header.op.saturating_sub(1) as usize;
-        let inner = unsafe { &*self.inner.get() };
-        inner.set.get(prev_idx).map(|msg| msg.header())
+        if header.op == 0 {
+            return None;
+        }
+
+        let prev_op = header.op - 1;
+        let headers = unsafe { &*self.headers.get() };
+        headers.get(prev_op as usize)
     }
 
     async fn append(&self, entry: Self::Entry) {
         let first_offset_and_timestamp = Self::message_to_batch(&entry)
             .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
 
-        let inner = unsafe { &mut *self.inner.get() };
-        let op = inner.set.len();
-        inner.set.push(entry);
+        let header = *entry.header();
+        let op = header.op;
+
+        {
+            let headers = unsafe { &mut *self.headers.get() };
+            headers.push(header);
+        };
+
+        let bytes = entry.into_inner();
+        {
+            let inner = unsafe { &*self.inner.get() };
+            let _ = inner.storage.write(bytes).await;
+        }
 
         if let Some((offset, timestamp)) = first_offset_and_timestamp {
             let offsets = unsafe { &mut *self.message_offset_to_op.get() };
@@ -222,33 +325,38 @@ impl Journal<Noop> for PartitionJournal2Impl {
     }
 
     async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
-        let op = header.op as usize;
-        let inner = unsafe { &*self.inner.get() };
-        inner.set.get(op).cloned()
+        self.message_by_op(header.op).await
     }
 }
 
-impl PartitionJournal2<Noop> for PartitionJournal2Impl {
+impl<S> PartitionJournal2<S> for PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
     type Query = MessageLookup;
 
     async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
         let query = *query;
-            let start_op = self.candidate_start_op(&query);
-            let messages = self.messages_from_op(start_op);
-            let batch_set = Self::messages_to_batch_set(messages);
-
-            let result = match query {
-                MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
-                MessageLookup::Timestamp { timestamp, count } => {
-                    batch_set.get_by_timestamp(timestamp, count)
-                }
-            };
+        let start_op = self.candidate_start_op(&query)?;
+        let count = match query {
+            MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { 
count, .. } => count,
+        };
+
+        let messages = self.load_messages_from_storage(start_op, count).await;
 
-            if result.is_empty() {
-                None
-            } else {
-                Some(result)
+        let batch_set = Self::messages_to_batch_set(messages);
+        let result = match query {
+            MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
+            MessageLookup::Timestamp { timestamp, count } => {
+                batch_set.get_by_timestamp(timestamp, count)
             }
+        };
+
+        if result.is_empty() {
+            None
+        } else {
+            Some(result)
+        }
     }
 }
 
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 12e78bb05..316a54fe7 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -45,7 +45,8 @@ impl Storage for MemStorage {
         len
     }
 
-    async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> 
Self::Buffer {
+    async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
+        let buffer = vec![0; len];
         let data = self.data.borrow();
         let end = offset + buffer.len();
         if offset < data.len() && end <= data.len() {
@@ -106,8 +107,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
         let header = headers.get(&header.op)?;
         let offset = *offsets.get(&header.op)?;
 
-        let buffer = vec![0; header.size as usize];
-        let buffer = self.storage.read(offset, buffer).await;
+        let buffer = self.storage.read(offset, header.size as usize).await;
         let message =
             Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
         Some(message)

Reply via email to