This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partitions_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f17ae7ca489a56f0861b6f622c2de95d07f92bbc
Author: numinex <[email protected]>
AuthorDate: Mon Mar 9 13:09:04 2026 +0100

    temp
---
 core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md | 141 ++++++++++++++++
 core/partitions/src/journal.rs                     | 182 +++++++++++++++++++--
 2 files changed, 312 insertions(+), 11 deletions(-)

diff --git a/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md 
b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md
new file mode 100644
index 000000000..ec42f4d7d
--- /dev/null
+++ b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md
@@ -0,0 +1,141 @@
+# PartitionJournal2 Storage Proposal
+
+## Objective
+Use `journal::Storage` as the actual backing store for serialized prepare 
entries (`Bytes`) and decode to `Message<PrepareHeader>` on read.
+
+## Current Problem
+- `PartitionJournal2Impl` currently stores entries directly in:
+  - `UnsafeCell<Vec<Message<PrepareHeader>>>`
+- `Noop` storage is unused for real data.
+- The old `Buffer = Entry` idea is too rigid for this path.
+- `Storage::read(&self, offset, buffer)` still requires a fallback buffer 
argument.
+
+## Design Direction
+
+### 1. Use serialized buffer (`Bytes`) in storage
+For `PartitionJournal2`, enforce:
+
+```rust
+S: Storage<Buffer = bytes::Bytes>
+```
+
+Journal entry remains:
+
+```rust
+type Entry = Message<PrepareHeader>;
+```
+
+Conversion boundary:
+- write path: `Message<PrepareHeader> -> Bytes` (serialize/store)
+- read path: `Bytes -> Message<PrepareHeader>` via `Message::from_bytes`
+
+### 2. Replace `Noop` with in-memory prepare-message storage
+Introduce a dedicated storage:
+
+```rust
+pub struct InMemoryPrepareStorage {
+    entries: UnsafeCell<Vec<bytes::Bytes>>,
+}
+```
+
+Behavior:
+- `write(bytes)` appends serialized message bytes to `entries`.
+- `read(offset, ...)` treats `offset` as `op_number` and returns that op entry.
+
+This keeps storage raw and simple, while typed decoding happens at journal 
boundary.
+
+### 2.1 `offset` semantics for this journal: `offset == op_number`
+For `PartitionJournal2`, define:
+- `Storage::read(offset, ...)` where `offset` is VSR `PrepareHeader.op` (op 
number), not byte offset.
+- Journal append path stores entries in op order, so op lookup is O(1)-ish via 
index map (or direct vec index if contiguous).
+
+Implementation detail:
+- Maintain `op_to_index: HashMap<u64, usize>` (or rely on contiguous `op` if 
guaranteed).
+- On `append(entry)`, cache `op_to_index.insert(entry.header().op, vec_index)`.
+- On `entry(header)`, call storage read using `header.op as usize` or 
map-resolved index.
+
+### 3. Make `PartitionJournal2Impl` storage-backed
+Refactor `PartitionJournal2Impl` to own a storage instance:
+
+```rust
+pub struct PartitionJournal2Impl<S: Storage<Buffer = bytes::Bytes>> {
+    storage: S,
+    // metadata/indexes only
+    headers: UnsafeCell<Vec<PrepareHeader>>,
+    message_offset_to_op: UnsafeCell<HashMap<u64, usize>>,
+    timestamp_to_op: UnsafeCell<HashMap<u64, usize>>,
+}
+```
+
+Responsibilities split:
+- `storage`: source of truth for serialized entry bytes
+- `headers`: lightweight header cache to satisfy `header()` / 
`previous_header()` reference semantics
+- maps: query acceleration for `get`
+
+### 4. Journal method behavior with storage
+- `append(entry)`:
+  - decode send-messages info for maps
+  - serialize: `let bytes = entry.into_inner()` (or equivalent)
+  - `storage.write(bytes)`
+  - push `*entry.header()` into `headers`
+  - cache `op_number -> storage position`
+- `entry(header)`:
+  - resolve by `op_number` (from `header.op`)
+  - fetch bytes from storage via read
+  - decode immediately: `Message::<PrepareHeader>::from_bytes(bytes)`
+- `header(idx)` and `previous_header(header)`:
+  - use `headers` vector (no full entry decode needed)
+
+### 5. `get` path stays batch-conversion based
+`get` still performs:
+- collect candidate bytes from storage
+- decode into `Message<PrepareHeader>`
+- convert `Vec<Message<PrepareHeader>> -> IggyMessagesBatchSet`
+- apply `MessageLookup` filter (`get_by_offset` / `get_by_timestamp`)
+
+The existing conversion helpers remain valid.
+
+## Storage Trait Consideration
+
+Current trait:
+
+```rust
+fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = 
Self::Buffer>;
+```
+
+For typed object lookups this is awkward. Recommended adjustment:
+
+```rust
+fn read(&self, offset: usize) -> impl Future<Output = Option<Self::Buffer>>;
+```
+
+Why:
+- avoids fake fallback buffer construction
+- maps naturally to indexed storage
+- still works for byte storage by returning `Option<Bytes>`
+
+If trait-wide change is too large right now, keep current signature 
temporarily and ignore/use the incoming `buffer` only as a fallback for 
out-of-range reads.
+
+For this phase, no trait change is required: just interpret the existing 
`offset` argument as `op_number` in `PartitionJournal2` storage.
+
+## Proposed File-Level Changes
+- `core/partitions/src/journal.rs`
+  - add `InMemoryPrepareStorage`
+  - make `PartitionJournal2Impl` generic over storage (or concrete to 
`InMemoryPrepareStorage`)
+  - remove direct `inner.set: Vec<Message<PrepareHeader>>` as primary store
+  - keep lightweight header metadata cache
+  - keep/adjust current lookup maps
+- `core/partitions/src/iggy_partition.rs` (only if/when wiring 
`PartitionJournal2` into partition log)
+  - replace `Noop` for this path with `InMemoryPrepareStorage`
+
+## Migration Sequence (Low Risk)
+1. Add `InMemoryPrepareStorage` without removing existing fields.
+2. Mirror writes to both old `inner.set` and storage.
+3. Switch reads (`entry`, `get`) to storage-backed path.
+4. Remove old `inner.set` once parity is confirmed.
+5. Optionally evolve `Storage::read` signature in a separate PR.
+
+## Expected Outcome
+- `Storage` is no longer a no-op for `PartitionJournal2`.
+- Storage buffer is raw serialized data (`Bytes`), and decoding to 
`Message<PrepareHeader>` happens at read boundary.
+- The in-memory backend stays simple (`UnsafeCell<Vec<Bytes>>`) and aligned 
with your proposed flow.
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index ab281e940..5f964b46b 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,9 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::BytesMut;
+use iggy_common::{
+    INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
PooledBuffer,
+    header::{Operation, PrepareHeader},
+    message::Message,
+};
 use journal::{Journal, Storage};
-use std::cell::UnsafeCell;
+use std::{cell::UnsafeCell, collections::HashMap};
 
 // TODO: Fix that, we need to figure out how to store the 
`IggyMessagesBatchSet`.
 /// No-op storage backend for the in-memory partition journal.
@@ -31,7 +36,7 @@ impl Storage for Noop {
         0
     }
 
-    async fn read(&self, _offset: usize, _buffer: ()) {}
+    async fn read(&self, _offset: usize, _buffer: ()) -> () { ()}
 }
 
 /// Lookup key for querying messages from the journal.
@@ -49,14 +54,7 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-/// In-memory journal that accumulates message batches as an 
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
+// [LEGACY]
 pub struct PartitionJournal {
     batch_set: UnsafeCell<IggyMessagesBatchSet>,
 }
@@ -92,6 +90,168 @@ impl std::fmt::Debug for PartitionJournal {
     }
 }
 
+pub trait PartitionJournal2<S>: Journal<S>
+where
+    S: Storage,
+{
+    type Query;
+
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
+}
+
+pub struct PartitionJournal2Impl {
+    message_offset_to_op: UnsafeCell<HashMap<u64, usize>>,
+    timestamp_to_op: UnsafeCell<HashMap<u64, usize>>,
+    inner: UnsafeCell<JournalInner>,
+}
+
+struct JournalInner {
+    set: Vec<Message<PrepareHeader>>,
+}
+
+impl Default for PartitionJournal2Impl {
+    fn default() -> Self {
+        Self {
+            message_offset_to_op: UnsafeCell::new(HashMap::new()),
+            timestamp_to_op: UnsafeCell::new(HashMap::new()),
+            inner: UnsafeCell::new(JournalInner { set: Vec::new() }),
+        }
+    }
+}
+
+impl PartitionJournal2Impl {
+    fn decode_send_messages_batch(body: bytes::Bytes) -> 
Option<IggyMessagesBatchMut> {
+        // TODO: This is bad, 
+        let mut body = body
+            .try_into_mut()
+            .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
+
+        if body.len() < 4 {
+            return None;
+        }
+
+        let count_bytes = body.split_to(4);
+        let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
+        let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
+
+        if body.len() < indexes_len {
+            return None;
+        }
+
+        let indexes_bytes = body.split_to(indexes_len);
+        let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
+        let messages = PooledBuffer::from(body);
+
+        Some(IggyMessagesBatchMut::from_indexes_and_messages(
+            indexes, messages,
+        ))
+    }
+
+    fn message_to_batch(message: &Message<PrepareHeader>) -> 
Option<IggyMessagesBatchMut> {
+        if message.header().operation != Operation::SendMessages {
+            return None;
+        }
+
+        Self::decode_send_messages_batch(message.body_bytes())
+    }
+
+    fn messages_to_batch_set<'a>(messages: impl Iterator<Item = &'a 
Message<PrepareHeader>>) -> IggyMessagesBatchSet {
+        let mut batch_set = IggyMessagesBatchSet::empty();
+
+        for message in messages {
+            if let Some(batch) = Self::message_to_batch(message) {
+                batch_set.add_batch(batch);
+            }
+        }
+
+        batch_set
+    }
+
+    fn candidate_start_op(&self, query: &MessageLookup) -> usize {
+        match query {
+            MessageLookup::Offset { offset, .. } => {
+                let offsets = unsafe { &*self.message_offset_to_op.get() };
+                offsets.get(offset).copied().unwrap_or_default()
+            }
+            MessageLookup::Timestamp { timestamp, .. } => {
+                let timestamps = unsafe { &*self.timestamp_to_op.get() };
+                timestamps.get(timestamp).copied().unwrap_or_default()
+            }
+        }
+    }
+
+    fn messages_from_op(&self, start_op: usize) -> impl Iterator<Item = 
&Message<PrepareHeader>> {
+        let inner = unsafe { &*self.inner.get() };
+        inner.set.iter().skip(start_op)
+    }
+}
+
+impl Journal<Noop> for PartitionJournal2Impl {
+    type Header = PrepareHeader;
+    type Entry = Message<Self::Header>;
+    type HeaderRef<'a> = &'a Self::Header;
+
+    fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+        // TODO: Fixes
+        let inner = unsafe { &*self.inner.get() };
+        inner.set.get(idx).map(|msg| msg.header())
+    }
+
+    fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
+        // TODO: Fixes
+        let prev_idx = header.op.saturating_sub(1) as usize;
+        let inner = unsafe { &*self.inner.get() };
+        inner.set.get(prev_idx).map(|msg| msg.header())
+    }
+
+    async fn append(&self, entry: Self::Entry) {
+        let first_offset_and_timestamp = Self::message_to_batch(&entry)
+            .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
+
+        let inner = unsafe { &mut *self.inner.get() };
+        let op = inner.set.len();
+        inner.set.push(entry);
+
+        if let Some((offset, timestamp)) = first_offset_and_timestamp {
+            let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+            offsets.insert(offset, op);
+
+            let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+            timestamps.insert(timestamp, op);
+        }
+    }
+
+    async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+        let op = header.op as usize;
+        let inner = unsafe { &*self.inner.get() };
+        inner.set.get(op).cloned()
+    }
+}
+
+impl PartitionJournal2<Noop> for PartitionJournal2Impl {
+    type Query = MessageLookup;
+
+    async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
+        let query = *query;
+            let start_op = self.candidate_start_op(&query);
+            let messages = self.messages_from_op(start_op);
+            let batch_set = Self::messages_to_batch_set(messages);
+
+            let result = match query {
+                MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
+                MessageLookup::Timestamp { timestamp, count } => {
+                    batch_set.get_by_timestamp(timestamp, count)
+                }
+            };
+
+            if result.is_empty() {
+                None
+            } else {
+                Some(result)
+            }
+    }
+}
+
 impl Journal<Noop> for PartitionJournal {
     type Header = MessageLookup;
     type Entry = IggyMessagesBatchMut;

Reply via email to