atharvalade commented on code in PR #2916: URL: https://github.com/apache/iggy/pull/2916#discussion_r2919908678
########## core/journal/src/metadata_journal.rs: ########## @@ -0,0 +1,686 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::file_storage::FileStorage; +use crate::{Journal, JournalHandle}; +use bytes::Bytes; +use iggy_common::header::{Command2, PrepareHeader}; +use iggy_common::message::Message; +use std::cell::{Cell, Ref, RefCell}; +use std::fmt; +use std::fs; +use std::io; +use std::io::Write; +use std::path::Path; + +const HEADER_SIZE: usize = size_of::<PrepareHeader>(); + +/// Maximum allowed size for a single WAL entry (64 MiB). +/// +/// A header with `size` exceeding this limit is treated as corrupt. This +/// prevents a bit-flipped size field (e.g. `0xFFFF_FFFF`) from causing a +/// multi-GiB allocation during the WAL scan. +const MAX_ENTRY_SIZE: u64 = 64 * 1024 * 1024; + +/// Number of slots in the journal ring buffer. +/// +/// Must be larger than the maximum number of entries between consecutive +/// snapshots. If the journal wraps past this window, older un-snapshotted +/// entries are silently evicted from the in-memory index (the WAL file +/// still contains them, but they become unreachable for recovery). +/// +/// **NOTE:** This number needs to be chosen in balance between number of +/// entries in [`core::consensus::pipeline_prepare_queue_max`]. Because this number controls +/// how many committed but not yet snapshotted entries that the buffer can +/// hold. This may need to be tuned properly. +pub const SLOT_COUNT: usize = 1024; + +/// Error type for journal operations. +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum JournalError { + Io(io::Error), +} + +impl fmt::Display for JournalError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(e) => write!(f, "journal I/O error: {e}"), + } + } +} + +impl std::error::Error for JournalError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(e) => Some(e), + } + } +} + +impl From<io::Error> for JournalError { + fn from(e: io::Error) -> Self { + Self::Io(e) + } +} + +/// Persistent metadata journal backed by an append-only WAL file. +/// +/// Each WAL entry is a raw `Message<PrepareHeader>`: +/// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]` +/// +/// The in-memory index is a fixed-size slot array indexed by +/// `op % SLOT_COUNT`. +pub struct MetadataJournal { + storage: FileStorage, + headers: RefCell<Vec<Option<PrepareHeader>>>, + offsets: RefCell<Vec<Option<u64>>>, + last_op: Cell<Option<u64>>, + /// Highest op that has been durably snapshotted. Entries with `op <= snapshot_op` + /// are safe to evict from the slot array. Appending an entry that would evict + /// an un-snapshotted entry (op > `snapshot_op`) panics and the upper layer must + /// take a snapshot before the journal wraps. + snapshot_op: Cell<u64>, +} + +impl fmt::Debug for MetadataJournal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MetadataJournal") + .field("write_offset", &self.storage.file_len()) + .field("last_op", &self.last_op.get()) + .finish_non_exhaustive() + } +} + +#[allow(clippy::cast_possible_truncation)] +const fn slot_for_op(op: u64) -> usize { + op as usize % SLOT_COUNT +} + +#[allow(clippy::cast_possible_truncation)] +impl MetadataJournal { + /// Open the WAL file, scanning forward to rebuild the in-memory index. + /// + /// `snapshot_op` is the highest op that has been durably snapshotted. + /// It must be provided so that `append()` can detect slot collisions + /// that would evict un-snapshotted entries. + /// + /// If a truncated entry is found at the tail (crash during write), + /// the file is truncated to the last complete entry. + /// + /// # Errors + /// Returns `JournalError::Io` if the WAL file cannot be opened or read. + pub fn open(path: &Path, snapshot_op: u64) -> Result<Self, JournalError> { + let storage = FileStorage::open(path)?; + let file_len = storage.file_len(); + let mut headers: Vec<Option<PrepareHeader>> = vec![None; SLOT_COUNT]; + let mut offsets: Vec<Option<u64>> = vec![None; SLOT_COUNT]; + let mut last_op: Option<u64> = None; + let mut pos: u64 = 0; + let mut header_buf = vec![0u8; HEADER_SIZE]; + + while pos + HEADER_SIZE as u64 <= file_len { + // Read the 256-byte header + storage.read_sync(pos, &mut header_buf)?; + let header: PrepareHeader = *bytemuck::checked::from_bytes(&header_buf); + + // Validate: must be a Prepare command with sane size + if header.command != Command2::Prepare + || (header.size as usize) < HEADER_SIZE + || u64::from(header.size) > MAX_ENTRY_SIZE + { + // Corrupt or non-prepare entry, truncate here + storage.truncate(pos)?; + break; + } + + let entry_size = u64::from(header.size); + + // Check if the full entry fits + if pos + entry_size > file_len { + // Truncated entry at tail + // This handles the case where crash happened during write and + // only header was written and body was not. so we truncate the file to the start of the entry. + storage.truncate(pos)?; + break; + } + + let slot = slot_for_op(header.op); + + // Note: Regarding duplicate op in WAL. We rewrite it with whichever + // is the latest entry. + headers[slot] = Some(header); + offsets[slot] = Some(pos); + + match last_op { + Some(current) if header.op > current => last_op = Some(header.op), + None => last_op = Some(header.op), + _ => {} + } + + pos += entry_size; + } + + // If there are leftover bytes less than a header, truncate them + if pos < storage.file_len() { + storage.truncate(pos)?; + } + + Ok(Self { + storage, + headers: RefCell::new(headers), + offsets: RefCell::new(offsets), + last_op: Cell::new(last_op), + snapshot_op: Cell::new(snapshot_op), + }) + } + + /// Return headers with `op >= from_op`, sorted by op. + pub fn iter_headers_from(&self, from_op: u64) -> Vec<PrepareHeader> { + let headers = self.headers.borrow(); + let mut result: Vec<PrepareHeader> = headers + .iter() + .filter_map(|slot| slot.filter(|h| h.op >= from_op)) + .collect(); + result.sort_unstable_by_key(|h| h.op); + result + } + + /// Highest op number in the index, or `None` if empty. + pub const fn last_op(&self) -> Option<u64> { + self.last_op.get() + } + + /// Advance the snapshot watermark. The caller must ensure `op` is + /// monotonically increasing and corresponds to a durable snapshot. + /// + /// # Panics + /// Panics if `op` is less than the current snapshot watermark. + pub fn set_snapshot_op(&self, op: u64) { + assert!( + op >= self.snapshot_op.get(), + "snapshot_op must be monotonically increasing: {} -> {}", + self.snapshot_op.get(), + op + ); + self.snapshot_op.set(op); + } + + /// Access the underlying storage (for fsync in tests, etc.). + pub const fn storage_ref(&self) -> &FileStorage { + &self.storage + } + + /// Synchronous entry read for recovery. + /// + /// Returns `Ok(None)` if the op is not in the index. + /// + /// # Errors + /// Returns an I/O error if the read fails or the entry is malformed. + pub fn entry_sync(&self, header: &PrepareHeader) -> io::Result<Option<Message<PrepareHeader>>> { + let offsets = self.offsets.borrow(); + let slot = slot_for_op(header.op); + let Some(offset) = offsets[slot] else { + return Ok(None); + }; + let size = header.size as usize; + drop(offsets); + let mut buf = vec![0u8; size]; + self.storage.read_sync(offset, &mut buf)?; + let msg = Message::from_bytes(Bytes::from(buf)) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + Ok(Some(msg)) + } +} + +#[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + clippy::future_not_send +)] +impl Journal<FileStorage> for MetadataJournal { + type Header = PrepareHeader; + type Entry = Message<PrepareHeader>; + type HeaderRef<'a> = Ref<'a, PrepareHeader>; + + fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> { + let headers = self.headers.borrow(); + Ref::filter_map(headers, |h| { + let slot = slot_for_op(idx as u64); + let header = h[slot].as_ref()?; + if header.op == idx as u64 { + Some(header) + } else { + None + } + }) + .ok() + } + + fn previous_header(&self, header: &Self::Header) -> Option<Self::HeaderRef<'_>> { + if header.op == 0 { + return None; + } + self.header((header.op - 1) as usize) + } + + fn set_snapshot_op(&self, op: u64) { + Self::set_snapshot_op(self, op); + } + + fn remaining_capacity(&self) -> Option<usize> { + let Some(last) = self.last_op.get() else { + return Some(SLOT_COUNT); + }; + let snapshot = self.snapshot_op.get(); + let used = (last - snapshot) as usize; + Some(SLOT_COUNT.saturating_sub(used)) + } Review Comment: `let used = (last - snapshot) as usize` will underflow if `last_op < snapshot_op`, which is possible after a partial WAL recovery where the snapshot is ahead of the truncated WAL (e.g. snapshot at op 100 but WAL only has entries up to op 50). In debug builds this panics. In release builds it wraps to a huge value, `saturating_sub` returns 0, and an unnecessary forced checkpoint is triggered. I believe using `last.saturating_sub(snapshot)` or adding a guard that handles this case will help. ########## core/metadata/src/impls/metadata.rs: ########## @@ -325,3 +404,31 @@ where send_prepare_ok_common(consensus, header, Some(persisted)).await; } } + +#[allow(unused)] +impl<C, J, S, M> IggyMetadata<C, J, S, M> { + /// Create a snapshot from the current state machine and persist it to disk. + /// + /// After the snapshot is durably persisted, advances the journal's + /// snapshot watermark so that entries at or below `last_op` may be + /// evicted from the ring buffer on future appends. + /// + /// # Errors + /// Returns `SnapshotError` if snapshotting, persistence, or compaction fails. + pub fn checkpoint(&self, data_dir: &Path, last_op: u64) -> Result<(), SnapshotError> + where + M: FillSnapshot<MetadataSnapshot>, + J: JournalHandle, + { + let snapshot = IggySnapshot::create(&self.mux_stm, last_op)?; + let path = data_dir.join(super::METADATA_DIR).join("snapshot.bin"); + snapshot.persist(&path)?; + + if let Some(journal) = &self.journal { + journal.handle().set_snapshot_op(last_op); + journal.handle().compact().map_err(SnapshotError::Io)?; Review Comment: `IggySnapshot::create` labels the snapshot with `sequence_number = last_op` (which is `current_op` from the caller), but the state machine has only applied entries up to the commit number, not up to the latest prepare. On recovery, `replay_from` is computed as `snapshot.sequence_number() + 1`, so entries between the real applied op and `current_op` get skipped and are never replayed into the state machine. Same root cause as issue 1: `last_op` should be the commit number. ########## core/metadata/src/impls/metadata.rs: ########## @@ -151,10 +198,36 @@ where // TODO add assertions for valid state here. - // TODO verify that the current prepare fits in the WAL. - // TODO handle gap in ops. + // Force a checkpoint if the journal is running low on capacity. + if journal + .handle() + .remaining_capacity() + .is_some_and(|c| c <= CHECKPOINT_MARGIN) + { + if let Some(data_dir) = &self.data_dir { + let snap_op = current_op; + if let Err(e) = self.checkpoint(data_dir, snap_op) { + error!( Review Comment: `snap_op` is set to `current_op`, which is the last prepared op, not the last committed op. When `checkpoint()` runs with this value it calls `set_snapshot_op` and `compact()`, which removes all WAL entries at or below that op. But entries between `commit_number + 1` and `current_op` are still in the pipeline awaiting quorum. When `on_ack` later tries to look them up via `journal.handle().entry()` at line 300, it hits the `unwrap_or_else(|| panic!(...))` because the entries were already compacted away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
