This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partitions_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit aff103e3373dc1f127aa71468b9820476ac4cf25 Author: numinex <[email protected]> AuthorDate: Tue Mar 10 10:01:03 2026 +0100 fixes --- core/partitions/src/iggy_partition.rs | 51 +++++++- core/partitions/src/iggy_partitions.rs | 54 +++------ core/partitions/src/journal.rs | 211 +++++++++++++-------------------- core/partitions/src/lib.rs | 35 +++++- core/simulator/src/deps.rs | 2 +- 5 files changed, 177 insertions(+), 176 deletions(-) diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 49c820230..bf309a7c8 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::journal::{Noop, PartitionJournal}; +use crate::journal::{PartitionJournal2Impl, PartitionJournalMemStorage}; use crate::log::SegmentedLog; -use crate::{AppendResult, Partition}; +use crate::{AppendResult, Partition, decode_send_messages_batch}; use iggy_common::{ ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut, IggyTimestamp, PartitionStats, + header::{Operation, PrepareHeader}, + message::Message, }; use journal::Journal as _; use std::sync::Arc; @@ -30,7 +32,8 @@ use tokio::sync::Mutex as TokioMutex; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. #[derive(Debug)] pub struct IggyPartition { - pub log: SegmentedLog<PartitionJournal, Noop>, + pub log: + SegmentedLog<PartitionJournal2Impl<PartitionJournalMemStorage>, PartitionJournalMemStorage>, /// Committed offset — advanced only after quorum ack. pub offset: Arc<AtomicU64>, /// Dirty offset — advanced on every prepare (before commit). @@ -46,6 +49,35 @@ pub struct IggyPartition { } impl IggyPartition { + fn prepare_message_from_batch( + mut header: PrepareHeader, + batch: &IggyMessagesBatchMut, + ) -> Message<PrepareHeader> { + let indexes = batch.indexes(); + let count = batch.count(); + let body_len = 4 + indexes.len() + batch.len(); + let total_size = std::mem::size_of::<PrepareHeader>() + body_len; + header.size = total_size as u32; + + let message = Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| { + *new = header; + }); + + let mut bytes = message + .into_inner() + .try_into_mut() + .expect("prepare_message_from_batch: expected unique bytes buffer"); + let header_size = std::mem::size_of::<PrepareHeader>(); + bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes()); + let mut position = header_size + 4; + bytes[position..position + indexes.len()].copy_from_slice(indexes); + position += indexes.len(); + bytes[position..position + batch.len()].copy_from_slice(batch); + + Message::<PrepareHeader>::from_bytes(bytes.freeze()) + .expect("prepare_message_from_batch: invalid prepared message bytes") + } + pub fn new(stats: Arc<PartitionStats>) -> Self { Self { log: SegmentedLog::default(), @@ -65,8 +97,16 @@ impl IggyPartition { impl Partition for IggyPartition { async fn append_messages( &mut self, - mut batch: IggyMessagesBatchMut, + message: Message<PrepareHeader>, ) -> Result<AppendResult, IggyError> { + let header = *message.header(); + if header.operation != Operation::SendMessages { + return Err(IggyError::CannotAppendMessage); + } + + let mut batch = decode_send_messages_batch(message.body_bytes()) + .ok_or(IggyError::CannotAppendMessage)?; + if batch.count() == 0 { return Ok(AppendResult::new(0, 0, 0)); } @@ -116,7 +156,8 @@ impl Partition for IggyPartition { journal.info.end_timestamp = ts; } - journal.inner.append(batch).await; + let message = Self::prepare_message_from_batch(header, &batch); + journal.inner.append(message).await; Ok(AppendResult::new( dirty_offset, diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index c3b1e0bc6..fda970d44 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -30,8 +30,7 @@ use consensus::{ }; use iggy_common::header::Command2; use iggy_common::{ - INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, - Segment, SegmentStorage, + IggyByteSize, PartitionStats, Segment, SegmentStorage, header::{ ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader, }, @@ -355,13 +354,13 @@ where } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { - let header = message.header(); + let header = *message.header(); let namespace = IggyNamespace::from_raw(header.namespace); let consensus = self .consensus() .expect("on_replicate: consensus not initialized"); - let current_op = match replicate_preflight(consensus, header) { + let current_op = match replicate_preflight(consensus, &header) { Ok(current_op) => current_op, Err(reason) => { warn!( @@ -372,7 +371,7 @@ where } }; - let is_old_prepare = fence_old_prepare_by_commit(consensus, header); + let is_old_prepare = fence_old_prepare_by_commit(consensus, &header); if is_old_prepare { warn!("received old prepare, not replicating"); } else { @@ -387,9 +386,9 @@ where // TODO: Figure out the flow of the partition operations. // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - self.apply_replicated_operation(&namespace, &message).await; + self.apply_replicated_operation(&namespace, message).await; - self.send_prepare_ok(header).await; + self.send_prepare_ok(&header).await; if consensus.is_follower() { self.commit_journal(namespace); @@ -539,37 +538,21 @@ where .register_namespace(ns); } - // TODO: Move this elsewhere, also do not reallocate, we do reallocationg now becauise we use PooledBuffer for the batch body - // but `Bytes` for `Message` payload. - fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut { - assert!(body.len() >= 4, "prepare body too small for batch header"); - let count = u32::from_le_bytes(body[0..4].try_into().unwrap()); - let indexes_len = count as usize * INDEX_SIZE; - let indexes_end = 4 + indexes_len; - assert!( - body.len() >= indexes_end, - "prepare body too small for {count} indexes", - ); - - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0); - let messages = PooledBuffer::from(&body[indexes_end..]); - IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages) - } - async fn apply_replicated_operation( &self, namespace: &IggyNamespace, - message: &Message<PrepareHeader>, + message: Message<PrepareHeader>, ) { let consensus = self .consensus() .expect("apply_replicated_operation: consensus not initialized"); - let header = message.header(); + let header = *message.header(); + // TODO: WE have to distinguish between an `message` recv by leader and follower. + // In the follower path, we have to skip the `prepare_for_persistance` path, just append to journal. match header.operation { Operation::SendMessages => { - let body = message.body_bytes(); - self.append_send_messages_to_journal(namespace, body.as_ref()) + self.append_send_messages_to_journal(namespace, message) .await; debug!( replica = consensus.replica(), @@ -598,12 +581,7 @@ where } } - async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) { - let batch = Self::batch_from_body(body); - self.append_messages_to_journal(namespace, batch).await; - } - - /// Append a batch to a partition's journal with offset assignment. + /// Append a prepare message to a partition's journal with offset assignment. /// /// Updates `segment.current_position` (logical position for indexing) but /// not `segment.end_offset` or `segment.end_timestamp` (committed state). @@ -611,15 +589,15 @@ where /// /// Uses `dirty_offset` for offset assignment so that multiple prepares /// can be pipelined before any commit. - async fn append_messages_to_journal( + async fn append_send_messages_to_journal( &self, namespace: &IggyNamespace, - batch: IggyMessagesBatchMut, + message: Message<PrepareHeader>, ) { let partition = self .get_mut_by_ns(namespace) - .expect("append_messages_to_journal: partition not found for namespace"); - let _ = partition.append_messages(batch).await; + .expect("append_send_messages_to_journal: partition not found for namespace"); + let _ = partition.append_messages(message).await; } /// Replicate a prepare message to the next replica in the chain. diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 93d3733fc..2139290d3 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use iggy_common::{ - INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, + IggyMessagesBatchMut, IggyMessagesBatchSet, header::{Operation, PrepareHeader}, message::Message, }; @@ -29,21 +29,6 @@ use std::{ const ZERO_LEN: usize = 0; -// TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. -/// No-op storage backend for the in-memory partition journal. -#[derive(Debug)] -pub struct Noop; - -impl Storage for Noop { - type Buffer = (); - - async fn write(&self, _buf: ()) -> usize { - 0 - } - - async fn read(&self, _offset: usize, _len: usize) -> () {} -} - /// Lookup key for querying messages from the journal. #[derive(Debug, Clone, Copy)] pub enum MessageLookup { @@ -59,42 +44,7 @@ impl std::ops::Deref for MessageLookup { } } -// [LEGACY] -pub struct PartitionJournal { - batch_set: UnsafeCell<IggyMessagesBatchSet>, -} - -impl PartitionJournal { - pub fn new() -> Self { - Self { - batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()), - } - } - - /// Drain all accumulated batches, returning the batch set. - pub fn commit(&self) -> IggyMessagesBatchSet { - let batch_set = unsafe { &mut *self.batch_set.get() }; - std::mem::take(batch_set) - } - - pub fn is_empty(&self) -> bool { - let batch_set = unsafe { &*self.batch_set.get() }; - batch_set.is_empty() - } -} - -impl Default for PartitionJournal { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for PartitionJournal { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PartitionJournal").finish() - } -} - +#[allow(dead_code)] pub trait PartitionJournal2<S>: Journal<S> where S: Storage, @@ -104,7 +54,7 @@ where fn get(&self, query: &Self::Query) -> impl Future<Output = Option<IggyMessagesBatchSet>>; } -#[derive(Default)] +#[derive(Debug, Default)] pub struct PartitionJournalMemStorage { entries: UnsafeCell<Vec<Bytes>>, op_to_index: UnsafeCell<HashMap<u64, usize>>, @@ -154,56 +104,100 @@ where inner: UnsafeCell<JournalInner<S>>, } -struct JournalInner<S> +impl<S> Default for PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes> + Default, +{ + fn default() -> Self { + Self { + message_offset_to_op: UnsafeCell::new(BTreeMap::new()), + timestamp_to_op: UnsafeCell::new(BTreeMap::new()), + headers: UnsafeCell::new(Vec::new()), + inner: UnsafeCell::new(JournalInner { + storage: S::default(), + }), + } + } +} + +impl<S> std::fmt::Debug for PartitionJournal2Impl<S> where S: Storage<Buffer = Bytes>, { - storage: S, + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionJournal2Impl").finish() + } } -impl<S> PartitionJournal2Impl<S> +struct JournalInner<S> where S: Storage<Buffer = Bytes>, { - fn decode_send_messages_batch(body: bytes::Bytes) -> Option<IggyMessagesBatchMut> { - // TODO: This is bad, - let mut body = body - .try_into_mut() - .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + storage: S, +} - if body.len() < 4 { - return None; - } +impl PartitionJournalMemStorage { + fn drain(&self) -> Vec<Bytes> { + let entries = unsafe { &mut *self.entries.get() }; + let drained = std::mem::take(entries); + let op_to_index = unsafe { &mut *self.op_to_index.get() }; + op_to_index.clear(); + drained + } - let count_bytes = body.split_to(4); - let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); - let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + fn is_empty(&self) -> bool { + let entries = unsafe { &*self.entries.get() }; + entries.is_empty() + } +} - if body.len() < indexes_len { - return None; +impl PartitionJournal2Impl<PartitionJournalMemStorage> { + /// Drain all accumulated batches, matching the legacy PartitionJournal API. + pub fn commit(&self) -> IggyMessagesBatchSet { + let entries = { + let inner = unsafe { &*self.inner.get() }; + inner.storage.drain() + }; + + let mut messages = Vec::with_capacity(entries.len()); + for bytes in entries { + if let Ok(message) = Message::from_bytes(bytes) { + messages.push(message); + } } - let indexes_bytes = body.split_to(indexes_len); - let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); - let messages = PooledBuffer::from(body); + let headers = unsafe { &mut *self.headers.get() }; + headers.clear(); + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.clear(); + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.clear(); + + Self::messages_to_batch_set(&messages) + } - Some(IggyMessagesBatchMut::from_indexes_and_messages( - indexes, messages, - )) + pub fn is_empty(&self) -> bool { + let inner = unsafe { &*self.inner.get() }; + inner.storage.is_empty() } +} +impl<S> PartitionJournal2Impl<S> +where + S: Storage<Buffer = Bytes>, +{ fn message_to_batch(message: &Message<PrepareHeader>) -> Option<IggyMessagesBatchMut> { if message.header().operation != Operation::SendMessages { return None; } - Self::decode_send_messages_batch(message.body_bytes()) + crate::decode_send_messages_batch(message.body_bytes()) } - fn messages_to_batch_set(messages: Vec<Message<PrepareHeader>>) -> IggyMessagesBatchSet { + fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> IggyMessagesBatchSet { let mut batch_set = IggyMessagesBatchSet::empty(); - for message in &messages { + for message in messages { if let Some(batch) = Self::message_to_batch(message) { batch_set.add_batch(batch); } @@ -212,6 +206,7 @@ where batch_set } + #[allow(dead_code)] fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> { match query { MessageLookup::Offset { offset, .. } => { @@ -240,9 +235,13 @@ where inner.storage.read(offset, ZERO_LEN).await }; - Some(Message::from_bytes(bytes).expect("invalid message bytes read from storage")) + Some( + Message::from_bytes(bytes) + .expect("partition.journal.storage.read: invalid bytes for message"), + ) } + #[allow(dead_code)] async fn load_messages_from_storage( &self, start_op: u64, @@ -294,7 +293,7 @@ where let prev_op = header.op - 1; let headers = unsafe { &*self.headers.get() }; - headers.get(prev_op as usize) + headers.iter().find(|candidate| candidate.op == prev_op) } async fn append(&self, entry: Self::Entry) { @@ -344,7 +343,7 @@ where let messages = self.load_messages_from_storage(start_op, count).await; - let batch_set = Self::messages_to_batch_set(messages); + let batch_set = Self::messages_to_batch_set(&messages); let result = match query { MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), MessageLookup::Timestamp { timestamp, count } => { @@ -359,51 +358,3 @@ where } } } - -impl Journal<Noop> for PartitionJournal { - type Header = MessageLookup; - type Entry = IggyMessagesBatchMut; - type HeaderRef<'a> = MessageLookup; - - fn header(&self, _idx: usize) -> Option<Self::HeaderRef<'_>> { - unreachable!("fn header: header lookup not supported for partition journal."); - } - - fn previous_header(&self, _header: &Self::Header) -> Option<Self::HeaderRef<'_>> { - unreachable!("fn previous_header: header lookup not supported for partition journal."); - } - - async fn append(&self, entry: Self::Entry) { - let batch_set = unsafe { &mut *self.batch_set.get() }; - batch_set.add_batch(entry); - } - - async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> { - // Entry lookups go through SegmentedLog which uses JournalInfo - // to construct MessageLookup headers. The actual query is done - // via get() below, not through the Journal trait. - let _ = header; - unreachable!("fn entry: use SegmentedLog::get() instead for partition journal lookups."); - } -} - -impl PartitionJournal { - /// Query messages by offset or timestamp with count. - /// - /// This is called by `SegmentedLog` using `MessageLookup` headers - /// constructed from `JournalInfo`. - pub fn get(&self, header: &MessageLookup) -> Option<IggyMessagesBatchSet> { - let batch_set = unsafe { &*self.batch_set.get() }; - let result = match header { - MessageLookup::Offset { offset, count } => batch_set.get_by_offset(*offset, *count), - MessageLookup::Timestamp { timestamp, count } => { - batch_set.get_by_timestamp(*timestamp, *count) - } - }; - if result.is_empty() { - None - } else { - Some(result) - } - } -} diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index d997d72b7..1f18e32ed 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -23,7 +23,11 @@ mod journal; mod log; mod types; -use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::{Bytes, BytesMut}; +use iggy_common::{ + INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, + PooledBuffer, header::PrepareHeader, message::Message, +}; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; pub use types::{ @@ -31,6 +35,33 @@ pub use types::{ SendMessagesResult, }; +pub(crate) fn decode_send_messages_batch(body: Bytes) -> Option<IggyMessagesBatchMut> { + // TODO: This very is bad, IGGY-114 Fixes this. + let mut body = body + .try_into_mut() + .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + + if body.len() < 4 { + return None; + } + + let count_bytes = body.split_to(4); + let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); + let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + + if body.len() < indexes_len { + return None; + } + + let indexes_bytes = body.split_to(indexes_len); + let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); + let messages = PooledBuffer::from(body); + + Some(IggyMessagesBatchMut::from_indexes_and_messages( + indexes, messages, + )) +} + /// Partition-level data plane operations. /// /// `send_messages` MUST only append to the partition journal (prepare phase), @@ -38,7 +69,7 @@ pub use types::{ pub trait Partition { fn append_messages( &mut self, - batch: IggyMessagesBatchMut, + message: Message<PrepareHeader>, ) -> impl Future<Output = Result<AppendResult, IggyError>>; fn poll_messages( diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 316a54fe7..9f4d64b79 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -46,7 +46,7 @@ impl Storage for MemStorage { } async fn read(&self, offset: usize, len: usize) -> Self::Buffer { - let buffer = vec![0; len]; + let mut buffer = vec![0; len]; let data = self.data.borrow(); let end = offset + buffer.len(); if offset < data.len() && end <= data.len() {
