This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partitions_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2fd6bc0ffdfb57a3832f40c481a1a9991bb85f25 Author: numinex <[email protected]> AuthorDate: Mon Mar 9 21:00:50 2026 +0100 smth --- core/journal/src/lib.rs | 5 +- core/partitions/src/journal.rs | 220 ++++++++++++++++++++++++++++++----------- core/simulator/src/deps.rs | 6 +- 3 files changed, 171 insertions(+), 60 deletions(-) diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs index f1b4081dd..55a6b14fb 100644 --- a/core/journal/src/lib.rs +++ b/core/journal/src/lib.rs @@ -39,7 +39,10 @@ pub trait Storage { type Buffer; fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>; - fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>; + // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` in order to support that. + // Maybe we should go back to passing in the `Buffer` again, but I am not sure how to handle it in the `Partitions Journal`, since we use in-memory impl + // which extracts the buffer out of the `Vec<Message>` and we don't need to allocate a new buffer. + fn read(&self, offset: usize, len: usize) -> impl Future<Output = Self::Buffer>; } pub trait JournalHandle { diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 5f964b46b..93d3733fc 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,14 +15,19 @@ // specific language governing permissions and limitations // under the License. -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use iggy_common::{ INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, header::{Operation, PrepareHeader}, message::Message, }; use journal::{Journal, Storage}; -use std::{cell::UnsafeCell, collections::HashMap}; +use std::{ + cell::UnsafeCell, + collections::{BTreeMap, HashMap}, +}; + +const ZERO_LEN: usize = 0; // TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. /// No-op storage backend for the in-memory partition journal. @@ -36,7 +41,7 @@ impl Storage for Noop { 0 } - async fn read(&self, _offset: usize, _buffer: ()) -> () { ()} + async fn read(&self, _offset: usize, _len: usize) -> () {} } /// Lookup key for querying messages from the journal. @@ -99,29 +104,69 @@ where fn get(&self, query: &Self::Query) -> impl Future<Output = Option<IggyMessagesBatchSet>>; } -pub struct PartitionJournal2Impl { - message_offset_to_op: UnsafeCell<HashMap<u64, usize>>, - timestamp_to_op: UnsafeCell<HashMap<u64, usize>>, - inner: UnsafeCell<JournalInner>, +#[derive(Default)] +pub struct PartitionJournalMemStorage { + entries: UnsafeCell<Vec<Bytes>>, + op_to_index: UnsafeCell<HashMap<u64, usize>>, } -struct JournalInner { - set: Vec<Message<PrepareHeader>>, -} +impl Storage for PartitionJournalMemStorage { + type Buffer = Bytes; -impl Default for PartitionJournal2Impl { - fn default() -> Self { - Self { - message_offset_to_op: UnsafeCell::new(HashMap::new()), - timestamp_to_op: UnsafeCell::new(HashMap::new()), - inner: UnsafeCell::new(JournalInner { set: Vec::new() }), + async fn write(&self, buf: Self::Buffer) -> usize { + let op = Message::<PrepareHeader>::from_bytes(buf.clone()) + .ok() + .map(|message| message.header().op); + + let entries = unsafe { &mut *self.entries.get() }; + let index = entries.len(); + entries.push(buf.clone()); + + if let Some(op) = op { + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.insert(op, index); } + + buf.len() + } + + async fn read(&self, offset: usize, _len: usize) -> Self::Buffer { + let op = offset as u64; + let Some(index) = ({ + let op_to_index = unsafe { &*self.op_to_index.get() }; + op_to_index.get(&op).copied() + }) else { + return Bytes::new(); + }; + + let entries = unsafe { &*self.entries.get() }; + entries.get(index).cloned().unwrap_or_default() } } -impl PartitionJournal2Impl { +pub struct PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes>, +{ + message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>, + timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>, + headers: UnsafeCell<Vec<PrepareHeader>>, + inner: UnsafeCell<JournalInner<S>>, +} + +struct JournalInner<S> +where + S: Storage<Buffer = Bytes>, +{ + storage: S, +} + +impl<S> PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes>, +{ fn decode_send_messages_batch(body: bytes::Bytes) -> Option<IggyMessagesBatchMut> { - // TODO: This is bad, + // TODO: This is bad, let mut body = body .try_into_mut() .unwrap_or_else(|body| BytesMut::from(body.as_ref())); @@ -155,10 +200,10 @@ impl PartitionJournal2Impl { Self::decode_send_messages_batch(message.body_bytes()) } - fn messages_to_batch_set<'a>(messages: impl Iterator<Item = &'a Message<PrepareHeader>>) -> IggyMessagesBatchSet { + fn messages_to_batch_set(messages: Vec<Message<PrepareHeader>>) -> IggyMessagesBatchSet { let mut batch_set = IggyMessagesBatchSet::empty(); - for message in messages { + for message in &messages { if let Some(batch) = Self::message_to_batch(message) { batch_set.add_batch(batch); } @@ -167,50 +212,108 @@ impl PartitionJournal2Impl { batch_set } - fn candidate_start_op(&self, query: &MessageLookup) -> usize { + fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> { match query { MessageLookup::Offset { offset, .. } => { let offsets = unsafe { &*self.message_offset_to_op.get() }; - offsets.get(offset).copied().unwrap_or_default() + offsets + .range(..=*offset) + .next_back() + .or_else(|| offsets.range(*offset..).next()) + .map(|(_, op)| *op) } MessageLookup::Timestamp { timestamp, .. } => { let timestamps = unsafe { &*self.timestamp_to_op.get() }; - timestamps.get(timestamp).copied().unwrap_or_default() + timestamps + .range(..=*timestamp) + .next_back() + .or_else(|| timestamps.range(*timestamp..).next()) + .map(|(_, op)| *op) } } } - fn messages_from_op(&self, start_op: usize) -> impl Iterator<Item = &Message<PrepareHeader>> { - let inner = unsafe { &*self.inner.get() }; - inner.set.iter().skip(start_op) + async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> { + let offset = usize::try_from(op).ok()?; + let bytes = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.read(offset, ZERO_LEN).await + }; + + Some(Message::from_bytes(bytes).expect("invalid message bytes read from storage")) + } + + async fn load_messages_from_storage( + &self, + start_op: u64, + count: u32, + ) -> Vec<Message<PrepareHeader>> { + if count == 0 { + return Vec::new(); + } + + let mut messages = Vec::new(); + let mut loaded_messages = 0u32; + let mut op = start_op; + + while loaded_messages < count { + let Some(message) = self.message_by_op(op).await else { + break; + }; + + if let Some(batch) = Self::message_to_batch(&message) { + loaded_messages = loaded_messages.saturating_add(batch.count()); + messages.push(message); + } + + op += 1; + } + + messages } } -impl Journal<Noop> for PartitionJournal2Impl { +impl<S> Journal<S> for PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes>, +{ type Header = PrepareHeader; type Entry = Message<Self::Header>; - type HeaderRef<'a> = &'a Self::Header; + #[rustfmt::skip] // Scuffed formatter. + type HeaderRef<'a> = &'a Self::Header where S: 'a; fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> { - // TODO: Fixes - let inner = unsafe { &*self.inner.get() }; - inner.set.get(idx).map(|msg| msg.header()) + let headers = unsafe { &mut *self.headers.get() }; + headers.get(idx) } fn previous_header(&self, header: &Self::Header) -> Option<Self::HeaderRef<'_>> { - // TODO: Fixes - let prev_idx = header.op.saturating_sub(1) as usize; - let inner = unsafe { &*self.inner.get() }; - inner.set.get(prev_idx).map(|msg| msg.header()) + if header.op == 0 { + return None; + } + + let prev_op = header.op - 1; + let headers = unsafe { &*self.headers.get() }; + headers.get(prev_op as usize) } async fn append(&self, entry: Self::Entry) { let first_offset_and_timestamp = Self::message_to_batch(&entry) .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); - let inner = unsafe { &mut *self.inner.get() }; - let op = inner.set.len(); - inner.set.push(entry); + let header = *entry.header(); + let op = header.op; + + { + let headers = unsafe { &mut *self.headers.get() }; + headers.push(header); + }; + + let bytes = entry.into_inner(); + { + let inner = unsafe { &*self.inner.get() }; + let _ = inner.storage.write(bytes).await; + } if let Some((offset, timestamp)) = first_offset_and_timestamp { let offsets = unsafe { &mut *self.message_offset_to_op.get() }; @@ -222,33 +325,38 @@ impl Journal<Noop> for PartitionJournal2Impl { } async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> { - let op = header.op as usize; - let inner = unsafe { &*self.inner.get() }; - inner.set.get(op).cloned() + self.message_by_op(header.op).await } } -impl PartitionJournal2<Noop> for PartitionJournal2Impl { +impl<S> PartitionJournal2<S> for PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes>, +{ type Query = MessageLookup; async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> { let query = *query; - let start_op = self.candidate_start_op(&query); - let messages = self.messages_from_op(start_op); - let batch_set = Self::messages_to_batch_set(messages); - - let result = match query { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), - MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(timestamp, count) - } - }; + let start_op = self.candidate_start_op(&query)?; + let count = match query { + MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { count, .. } => count, + }; + + let messages = self.load_messages_from_storage(start_op, count).await; - if result.is_empty() { - None - } else { - Some(result) + let batch_set = Self::messages_to_batch_set(messages); + let result = match query { + MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), + MessageLookup::Timestamp { timestamp, count } => { + batch_set.get_by_timestamp(timestamp, count) } + }; + + if result.is_empty() { + None + } else { + Some(result) + } } } diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 12e78bb05..316a54fe7 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -45,7 +45,8 @@ impl Storage for MemStorage { len } - async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> Self::Buffer { + async fn read(&self, offset: usize, len: usize) -> Self::Buffer { + let buffer = vec![0; len]; let data = self.data.borrow(); let end = offset + buffer.len(); if offset < data.len() && end <= data.len() { @@ -106,8 +107,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> { let header = headers.get(&header.op)?; let offset = *offsets.get(&header.op)?; - let buffer = vec![0; header.size as usize]; - let buffer = self.storage.read(offset, buffer).await; + let buffer = self.storage.read(offset, header.size as usize).await; let message = Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes should be valid"); Some(message)
