numinnex commented on code in PR #2916:
URL: https://github.com/apache/iggy/pull/2916#discussion_r2931352057


##########
core/journal/src/file_storage.rs:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Storage;
+use compio::buf::IoBuf;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
+use std::cell::{Cell, RefCell};
+use std::io;
+use std::path::{Path, PathBuf};
+
+/// File-backed storage implementing the `Storage` trait.
+pub struct FileStorage {
+    file: RefCell<compio::fs::File>,

Review Comment:
   You cannot use `RefCell` together with components that have `async` methods. 



##########
core/journal/src/lib.rs:
##########
@@ -30,16 +34,40 @@ where
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>>;
     fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>>;
 
-    fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+    fn append(&self, entry: Self::Entry) -> impl Future<Output = 
io::Result<()>>;
     fn entry(&self, header: &Self::Header) -> impl Future<Output = 
Option<Self::Entry>>;
+
+    /// Advance the snapshot watermark so entries at or below `op` may be
+    /// evicted from the journal's in-memory index. The default is a no-op
+    /// for journals that do not require this watermark.
+    fn set_snapshot_op(&self, _op: u64) {}
+
+    /// Number of entries that can be appended before the journal would need
+    /// to evict un-snapshotted slots. Returns `None` for journals that don't 
persist to disk.
+    fn remaining_capacity(&self) -> Option<usize> {
+        None
+    }
+
+    /// Remove snapshotted entries from the WAL to reclaim disk space.
+    /// The default is a no-op for journals that do not persist to disk.
+    ///
+    /// # Errors
+    /// Returns an I/O error if compaction fails.
+    fn compact(&self) -> impl Future<Output = io::Result<()>> {
+        async { Ok(()) }
+    }
 }
 
 // TODO: Move to other crate.
 pub trait Storage {
     type Buffer;
 
-    fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
-    fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output 
= Self::Buffer>;
+    fn write(&self, buf: Self::Buffer) -> impl Future<Output = 
io::Result<usize>>;

Review Comment:
   I was thinking, since I've added `offset` to the read method, shouldn't we 
rename this method to `write_at` - fn write_at(&self, offset: usize, buf: 
Self::Buffer)` and create on top of that  `fn append(&self, buf: 
Self::Buffer)`.  What do you think ? I am wondering which API would serve us 
better from the perspective of implementing an InMemory storage both for server 
usage, aswell as simulator.



##########
core/journal/src/lib.rs:
##########
@@ -30,16 +34,40 @@ where
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>>;
     fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>>;
 
-    fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+    fn append(&self, entry: Self::Entry) -> impl Future<Output = 
io::Result<()>>;
     fn entry(&self, header: &Self::Header) -> impl Future<Output = 
Option<Self::Entry>>;
+
+    /// Advance the snapshot watermark so entries at or below `op` may be
+    /// evicted from the journal's in-memory index. The default is a no-op
+    /// for journals that do not require this watermark.
+    fn set_snapshot_op(&self, _op: u64) {}
+
+    /// Number of entries that can be appended before the journal would need
+    /// to evict un-snapshotted slots. Returns `None` for journals that don't 
persist to disk.
+    fn remaining_capacity(&self) -> Option<usize> {
+        None
+    }
+
+    /// Remove snapshotted entries from the WAL to reclaim disk space.
+    /// The default is a no-op for journals that do not persist to disk.
+    ///
+    /// # Errors
+    /// Returns an I/O error if compaction fails.
+    fn compact(&self) -> impl Future<Output = io::Result<()>> {
+        async { Ok(()) }
+    }

Review Comment:
   I think, I'd prefer the journal to have some sort of `drain` method that 
allows to "extract" range of items, simiarly to how `Vec::drain(begin..end)` 
works. This way we do not hack some apis on the interface just to cover an edge 
case, but we create a general purpose API, that can be used to shrink the 
journal and we handle the watermark outside of the `Journal`. 
   
   And yeah an `Stream` iter would be perfect, but since `AsyncIterator` is 
still unstable and it's probably going to replace the `Stream` trait, we can 
return no-async drain iterator and do the disk read for the entire range in one 
go. 



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -52,6 +53,47 @@ impl IggySnapshot {
     pub const fn snapshot(&self) -> &MetadataSnapshot {
         &self.snapshot
     }
+
+    /// Persist the snapshot to disk.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if serialization or I/O fails.
+    pub fn persist(&self, path: &Path) -> Result<(), SnapshotError> {
+        use std::fs;
+        use std::io::Write;
+
+        let encoded = self.encode()?;
+
+        let tmp_path = path.with_extension("bin.tmp");
+
+        let mut file = fs::File::create(&tmp_path)?;
+        file.write_all(&encoded)?;
+        file.sync_all()?;
+        drop(file);
+
+        fs::rename(&tmp_path, path)?;
+
+        // Fsync the parent directory to ensure the rename is durable.
+        if let Some(parent) = path.parent() {
+            let dir = fs::File::open(parent)?;
+            dir.sync_all()?;
+        }
+
+        Ok(())
+    }
+
+    /// Load a snapshot from disk.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if the file cannot be read or deserialization 
fails.
+    pub fn load(path: &Path) -> Result<Self, SnapshotError> {
+        let data = std::fs::read(path)?;
+
+        // TODO: when checksum is added we need to check

Review Comment:
   create a linear issue for that. 



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -99,6 +140,37 @@ pub struct IggyMetadata<C, J, S, M> {
     pub snapshot: Option<S>,
     /// State machine - lives on all shards
     pub mux_stm: M,
+    /// Root data directory, used by checkpoint to persist snapshots.

Review Comment:
   Maybe it's good idea to store some sort of `snapshot_coordinator` there 
struct, that would hide those details away ? 



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -52,6 +53,47 @@ impl IggySnapshot {
     pub const fn snapshot(&self) -> &MetadataSnapshot {
         &self.snapshot
     }
+
+    /// Persist the snapshot to disk.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if serialization or I/O fails.
+    pub fn persist(&self, path: &Path) -> Result<(), SnapshotError> {
+        use std::fs;
+        use std::io::Write;
+
+        let encoded = self.encode()?;
+
+        let tmp_path = path.with_extension("bin.tmp");
+
+        let mut file = fs::File::create(&tmp_path)?;
+        file.write_all(&encoded)?;
+        file.sync_all()?;
+        drop(file);
+
+        fs::rename(&tmp_path, path)?;
+
+        // Fsync the parent directory to ensure the rename is durable.
+        if let Some(parent) = path.parent() {
+            let dir = fs::File::open(parent)?;
+            dir.sync_all()?;
+        }
+
+        Ok(())

Review Comment:
   I think we should handle those errors inline here, or atleast convert them 
to error types that would represent different stages of snapshot persistance, 
as depending on the stage, we can be left in a broken state that is still 
recoverable. For example if we'd to fail in the `rename` stage, we can safely 
retry that operation, but if we'd to fail in the line 70-71, we have to start 
from scratch, as the `write_all` could partially write the buffer and then 
fail, same for `sync_all`. 
   
   I also think we should move to `O_SYNC` and `O_DSYNC` for the metadata, but 
that can wait, as I am working now on adding support to `O_DIRECT` for 
`partitions` and this requires quite a few changes to the `Buffer` that we use, 
as `Bytes`/`BytesMut` does not allow to allocate aligned buffers.



##########
core/metadata/src/impls/metadata.rs:
##########
@@ -325,3 +432,35 @@ where
         send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
+
+#[allow(unused)]
+impl<C, J, S, M> IggyMetadata<C, J, S, M> {
+    /// Create a snapshot from the current state machine and persist it to 
disk.
+    ///
+    /// After the snapshot is durably persisted, advances the journal's
+    /// snapshot watermark so that entries at or below `last_op` may be
+    /// evicted from the ring buffer on future appends.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if snapshotting, persistence, or compaction 
fails.
+    #[allow(clippy::future_not_send)]
+    pub async fn checkpoint(&self, data_dir: &Path, last_op: u64) -> 
Result<(), SnapshotError>

Review Comment:
   If we'd go with `Comment on line R143`, then this would be part of that 
coordinator I've mentioned on `R143`



##########
core/journal/src/file_storage.rs:
##########
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Storage;
+use compio::buf::IoBuf;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
+use std::cell::{Cell, RefCell};
+use std::io;
+use std::path::{Path, PathBuf};
+
+/// File-backed storage implementing the `Storage` trait.
+pub struct FileStorage {
+    file: RefCell<compio::fs::File>,
+    write_offset: Cell<u64>,
+    path: PathBuf,
+}
+
+#[allow(clippy::future_not_send, clippy::await_holding_refcell_ref)]

Review Comment:
   Let's not silence the `await_holding_refcell_ref`, use an `UnsafeCell` 
instead of `RefCell`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to