This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ad46a979c chore(shard,metadata): enable pedantic and nursery clippy 
lints (#2896)
ad46a979c is described below

commit ad46a979c8a5e301ee9d7ba197785bd5d1d06f9b
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 11:47:05 2026 +0100

    chore(shard,metadata): enable pedantic and nursery clippy lints (#2896)
---
 core/bench/Cargo.toml                              |  2 +-
 core/message_bus/Cargo.toml                        |  2 +-
 core/metadata/Cargo.toml                           |  5 ++++
 core/metadata/src/impls/metadata.rs                | 20 +++++++++----
 core/metadata/src/permissioner/mod.rs              | 13 ++++-----
 .../permissioner_rules/consumer_groups.rs          |  1 +
 .../permissioner_rules/consumer_offsets.rs         |  1 +
 .../permissioner/permissioner_rules/messages.rs    |  5 ++--
 .../permissioner/permissioner_rules/partitions.rs  |  1 +
 .../permissioner/permissioner_rules/segments.rs    |  1 +
 .../src/permissioner/permissioner_rules/streams.rs |  1 +
 .../src/permissioner/permissioner_rules/system.rs  |  1 +
 .../src/permissioner/permissioner_rules/topics.rs  |  7 +++--
 .../src/permissioner/permissioner_rules/users.rs   |  1 +
 core/metadata/src/stats/mod.rs                     |  4 +--
 core/metadata/src/stm/consumer_group.rs            | 17 +++++------
 core/metadata/src/stm/mod.rs                       | 18 +++++++++---
 core/metadata/src/stm/mux.rs                       | 12 ++++----
 core/metadata/src/stm/snapshot.rs                  | 33 ++++++++++++++--------
 core/metadata/src/stm/stream.rs                    |  5 +++-
 core/metadata/src/stm/user.rs                      |  7 ++++-
 core/partitions/Cargo.toml                         |  2 +-
 core/shard/Cargo.toml                              |  5 ++++
 core/shard/src/lib.rs                              | 18 ++++++++++--
 core/shard/src/router.rs                           |  4 ++-
 core/shard/src/shards_table.rs                     |  7 +++--
 26 files changed, 133 insertions(+), 60 deletions(-)

diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index a94add9fa..2cd42a80d 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -57,4 +57,4 @@ uuid = { workspace = true }
 [lints.clippy]
 enum_glob_use = "deny"
 pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index fabab64ce..aabb56543 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -34,4 +34,4 @@ rand = { workspace = true }
 [lints.clippy]
 enum_glob_use = "deny"
 pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index fcad045bf..6cdad3ad4 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -40,3 +40,8 @@ rmp-serde = { workspace = true }
 serde = { workspace = true, features = ["derive"] }
 slab = { workspace = true }
 tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 41cb21296..9751eb0c7 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -41,13 +41,15 @@ pub struct IggySnapshot {
 
 #[allow(unused)]
 impl IggySnapshot {
+    #[must_use]
     pub fn new(sequence_number: u64) -> Self {
         Self {
             snapshot: MetadataSnapshot::new(sequence_number),
         }
     }
 
-    pub fn snapshot(&self) -> &MetadataSnapshot {
+    #[must_use]
+    pub const fn snapshot(&self) -> &MetadataSnapshot {
         &self.snapshot
     }
 }
@@ -99,6 +101,7 @@ pub struct IggyMetadata<C, J, S, M> {
     pub mux_stm: M,
 }
 
+#[allow(clippy::future_not_send)]
 impl<B, J, S, M> Plane<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, J, 
S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -137,12 +140,13 @@ where
         };
 
         // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
+        #[allow(clippy::cast_possible_truncation)]
         let is_old_prepare = fence_old_prepare_by_commit(consensus, header)
             || journal.handle().header(header.op as usize).is_some();
-        if !is_old_prepare {
-            self.replicate(message.clone()).await;
-        } else {
+        if is_old_prepare {
             warn!("received old prepare, not replicating");
+        } else {
+            self.replicate(message.clone()).await;
         }
 
         // TODO add assertions for valid state here.
@@ -245,7 +249,7 @@ where
                     .message_bus()
                     .send_to_client(prepare_header.client, generic_reply)
                     .await
-                    .unwrap()
+                    .unwrap();
             }
         }
     }
@@ -285,6 +289,7 @@ where
     /// - Primary sends to first backup
     /// - Each backup forwards to the next
     /// - Stops when we would forward back to primary
+    #[allow(clippy::future_not_send)]
     async fn replicate(&self, message: Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
@@ -292,6 +297,7 @@ where
         let header = message.header();
 
         // TODO: calculate the index;
+        #[allow(clippy::cast_possible_truncation)]
         let idx = header.op as usize;
         assert_eq!(header.command, Command2::Prepare);
         assert!(
@@ -304,12 +310,14 @@ where
     // TODO: Implement jump_to_newer_op
     // fn jump_to_newer_op(&self, header: &PrepareHeader) {}
 
-    fn commit_journal(&self) {
+    #[allow(clippy::unused_self)]
+    const fn commit_journal(&self) {
         // TODO: Implement commit logic
         // Walk through journal from last committed to current commit number
         // Apply each entry to the state machine
     }
 
+    #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
diff --git a/core/metadata/src/permissioner/mod.rs 
b/core/metadata/src/permissioner/mod.rs
index 52b13443f..74a83ec23 100644
--- a/core/metadata/src/permissioner/mod.rs
+++ b/core/metadata/src/permissioner/mod.rs
@@ -31,16 +31,15 @@ pub struct Permissioner {
 }
 
 impl Permissioner {
+    #[must_use]
     pub fn new() -> Self {
         Self::default()
     }
 
     pub fn init_permissions_for_user(&mut self, user_id: UserId, permissions: 
Option<Permissions>) {
-        if permissions.is_none() {
+        let Some(permissions) = permissions else {
             return;
-        }
-
-        let permissions = permissions.unwrap();
+        };
         if permissions.global.poll_messages {
             self.users_that_can_poll_messages_from_all_streams
                 .insert(user_id);
@@ -52,11 +51,9 @@ impl Permissioner {
         }
 
         self.users_permissions.insert(user_id, permissions.global);
-        if permissions.streams.is_none() {
+        let Some(streams) = permissions.streams else {
             return;
-        }
-
-        let streams = permissions.streams.unwrap();
+        };
         for (stream_id, stream) in streams {
             if stream.poll_messages {
                 self.users_that_can_poll_messages_from_specific_streams
diff --git 
a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs 
b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
index 66022ae2d..2d4bcdd1b 100644
--- a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn create_consumer_group(
         &self,
diff --git 
a/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs 
b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
index f94e3fdc0..ab93c1fe4 100644
--- a/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn get_consumer_offset(
         &self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/messages.rs 
b/core/metadata/src/permissioner/permissioner_rules/messages.rs
index ad678d0f2..968d1aed4 100644
--- a/core/metadata/src/permissioner/permissioner_rules/messages.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/messages.rs
@@ -19,8 +19,9 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
-    /// Inheritance: manage_streams → read_streams → read_topics → 
poll_messages
+    /// Inheritance: `manage_streams` -> `read_streams` -> `read_topics` -> 
`poll_messages`
     pub fn poll_messages(
         &self,
         user_id: u32,
@@ -79,7 +80,7 @@ impl Permissioner {
         Err(IggyError::Unauthorized)
     }
 
-    /// Inheritance: manage_streams → manage_topics → send_messages
+    /// Inheritance: `manage_streams` -> `manage_topics` -> `send_messages`
     pub fn append_messages(
         &self,
         user_id: u32,
diff --git a/core/metadata/src/permissioner/permissioner_rules/partitions.rs 
b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
index 7f11007a1..11b780fc7 100644
--- a/core/metadata/src/permissioner/permissioner_rules/partitions.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn create_partitions(
         &self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/segments.rs 
b/core/metadata/src/permissioner/permissioner_rules/segments.rs
index 558e2b0b3..4de5bd2cc 100644
--- a/core/metadata/src/permissioner/permissioner_rules/segments.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/segments.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn delete_segments(
         &self,
diff --git a/core/metadata/src/permissioner/permissioner_rules/streams.rs 
b/core/metadata/src/permissioner/permissioner_rules/streams.rs
index 153d3d4d5..a978d2fdd 100644
--- a/core/metadata/src/permissioner/permissioner_rules/streams.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/streams.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn get_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
         if let Some(global_permissions) = self.users_permissions.get(&user_id)
diff --git a/core/metadata/src/permissioner/permissioner_rules/system.rs 
b/core/metadata/src/permissioner/permissioner_rules/system.rs
index e2ef0d8c7..e74ced86f 100644
--- a/core/metadata/src/permissioner/permissioner_rules/system.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/system.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn get_stats(&self, user_id: u32) -> Result<(), IggyError> {
         self.get_server_info(user_id)
diff --git a/core/metadata/src/permissioner/permissioner_rules/topics.rs 
b/core/metadata/src/permissioner/permissioner_rules/topics.rs
index 6e93655c3..443478d7a 100644
--- a/core/metadata/src/permissioner/permissioner_rules/topics.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/topics.rs
@@ -19,8 +19,9 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
-    /// Inheritance: manage_streams → read_streams → read_topics
+    /// Inheritance: `manage_streams` -> `read_streams` -> `read_topics`
     pub fn get_topic(
         &self,
         user_id: u32,
@@ -79,7 +80,7 @@ impl Permissioner {
         Err(IggyError::Unauthorized)
     }
 
-    /// Inheritance: manage_streams → manage_topics
+    /// Inheritance: `manage_streams` -> `manage_topics`
     pub fn create_topic(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
         if let Some(global) = self.users_permissions.get(&user_id)
             && (global.manage_streams || global.manage_topics)
@@ -123,7 +124,7 @@ impl Permissioner {
         self.manage_topic(user_id, stream_id, topic_id)
     }
 
-    /// Inheritance: manage_streams → manage_topics
+    /// Inheritance: `manage_streams` -> `manage_topics`
     fn manage_topic(
         &self,
         user_id: u32,
diff --git a/core/metadata/src/permissioner/permissioner_rules/users.rs 
b/core/metadata/src/permissioner/permissioner_rules/users.rs
index bba4b84a9..3c831b0a6 100644
--- a/core/metadata/src/permissioner/permissioner_rules/users.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/users.rs
@@ -19,6 +19,7 @@
 use crate::permissioner::Permissioner;
 use iggy_common::IggyError;
 
+#[allow(clippy::missing_errors_doc)]
 impl Permissioner {
     pub fn get_user(&self, user_id: u32) -> Result<(), IggyError> {
         self.read_users(user_id)
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
index f4cc3aedc..28dd5c5a4 100644
--- a/core/metadata/src/stats/mod.rs
+++ b/core/metadata/src/stats/mod.rs
@@ -114,7 +114,7 @@ pub struct TopicStats {
 }
 
 impl TopicStats {
-    pub fn new(parent: Arc<StreamStats>) -> Self {
+    pub const fn new(parent: Arc<StreamStats>) -> Self {
         Self {
             parent,
             size_bytes: AtomicU64::new(0),
@@ -258,7 +258,7 @@ pub struct PartitionStats {
 }
 
 impl PartitionStats {
-    pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+    pub const fn new(parent_stats: Arc<TopicStats>) -> Self {
         Self {
             parent: parent_stats,
             messages_count: AtomicU64::new(0),
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index f9e42178f..2eddcdfaa 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -38,6 +38,7 @@ pub struct ConsumerGroupMember {
 }
 
 impl ConsumerGroupMember {
+    #[must_use]
     pub fn new(id: usize, client_id: u32) -> Self {
         Self {
             id,
@@ -57,7 +58,8 @@ pub struct ConsumerGroup {
 }
 
 impl ConsumerGroup {
-    pub fn new(name: Arc<str>) -> Self {
+    #[must_use]
+    pub const fn new(name: Arc<str>) -> Self {
         Self {
             id: 0,
             name,
@@ -74,16 +76,16 @@ impl ConsumerGroup {
             return;
         }
 
-        let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
-        for &member_id in &member_ids {
+        let member_keys: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
+        for &member_id in &member_keys {
             if let Some(member) = self.members.get_mut(member_id) {
                 member.partitions.clear();
             }
         }
 
         for (i, &partition_id) in self.partitions.iter().enumerate() {
-            let member_idx = i % member_count;
-            if let Some(&member_id) = member_ids.get(member_idx)
+            let target = i % member_count;
+            if let Some(&member_id) = member_keys.get(target)
                 && let Some(member) = self.members.get_mut(member_id)
             {
                 member.partitions.push(partition_id);
@@ -180,7 +182,7 @@ impl StateHandler for CreateConsumerGroup {
         let id = state.items.insert(group);
         state.items[id].id = id;
 
-        state.name_index.insert(name.clone(), id);
+        state.name_index.insert(name, id);
 
         if let (Ok(s), Ok(t)) = (
             self.stream_id.get_u32_value(),
@@ -328,7 +330,7 @@ impl Snapshotable for ConsumerGroups {
         let mut group_entries: Vec<(usize, ConsumerGroup)> = Vec::new();
 
         for (slab_key, group_snap) in snapshot.items {
-            let member_entries: Vec<(usize, ConsumerGroupMember)> = group_snap
+            let members: Slab<ConsumerGroupMember> = group_snap
                 .members
                 .into_iter()
                 .map(|(member_key, member_snap)| {
@@ -341,7 +343,6 @@ impl Snapshotable for ConsumerGroups {
                     (member_key, member)
                 })
                 .collect();
-            let members: Slab<ConsumerGroupMember> = 
member_entries.into_iter().collect();
 
             let group_name: Arc<str> = Arc::from(group_snap.name.as_str());
             let group = ConsumerGroup {
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 82aed80b1..5171207ae 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -23,7 +23,7 @@ pub mod user;
 
 use bytes::Bytes;
 use iggy_common::Either;
-use left_right::*;
+use left_right::{Absorb, ReadHandle, WriteHandle};
 use std::cell::UnsafeCell;
 use std::sync::Arc;
 
@@ -47,18 +47,21 @@ impl<T, O> WriteCell<T, O>
 where
     T: Absorb<O>,
 {
-    pub fn new(write: WriteHandle<T, O>) -> Self {
+    pub const fn new(write: WriteHandle<T, O>) -> Self {
         Self {
             inner: UnsafeCell::new(write),
         }
     }
 
+    /// # Panics
+    /// Panics if the inner `UnsafeCell` pointer is null (should never happen
+    /// unless the writer was not properly initialized).
     pub fn apply(&self, cmd: O) {
         let hdl = unsafe {
             self.inner
                 .get()
                 .as_mut()
-                .expect("[apply]: called on uninit writer, for cmd: {cmd}")
+                .expect("[apply]: called on uninit writer")
         };
         hdl.append(cmd).publish();
     }
@@ -69,6 +72,7 @@ where
 /// - `Ok(Either::Left(cmd))` if applicable
 /// - `Ok(Either::Right(input))` to pass ownership back
 /// - `Err(error)` for malformed payload/parse errors
+#[allow(clippy::missing_errors_doc)]
 pub trait Command {
     type Cmd;
     type Input;
@@ -99,6 +103,8 @@ impl<T, C> LeftRight<T, C>
 where
     T: Absorb<C>,
 {
+    /// # Panics
+    /// Panics if the read handle has been dropped (should never happen in 
normal operation).
     pub fn read<F, R>(&self, f: F) -> R
     where
         F: FnOnce(&T) -> R,
@@ -125,6 +131,8 @@ impl<T> LeftRight<T, <T as Command>::Cmd>
 where
     T: Absorb<<T as Command>::Cmd> + Clone + Command,
 {
+    /// # Panics
+    /// Panics if this is not the owner shard (no write handle available).
     pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
         self.write
             .as_ref()
@@ -134,6 +142,7 @@ where
 }
 
 /// Public interface for state handlers.
+#[allow(clippy::missing_errors_doc)]
 pub trait State {
     type Output;
     type Input;
@@ -142,6 +151,7 @@ pub trait State {
     fn apply(&self, input: Self::Input) -> Result<Either<Self::Output, 
Self::Input>, Self::Error>;
 }
 
+#[allow(clippy::missing_errors_doc)]
 pub trait StateMachine {
     type Input;
     type Output;
@@ -153,7 +163,7 @@ pub trait StateMachine {
 ///
 /// # Generated items
 /// - `{$state}Inner` struct with the specified fields (the data)
-/// - `$state` wrapper struct (contains LeftRight storage)
+/// - `$state` wrapper struct (contains `LeftRight` storage)
 /// - `From<LeftRight<...>>` impl for `$state`
 /// - `From<{$state}Inner>` impl for `$state`
 ///
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 0f6d39dcc..4b6127873 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -35,7 +35,8 @@ impl<T> MuxStateMachine<T>
 where
     T: StateMachine,
 {
-    pub fn new(inner: T) -> Self {
+    #[must_use]
+    pub const fn new(inner: T) -> Self {
         Self { inner }
     }
 }
@@ -124,7 +125,7 @@ where
 {
     fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self, 
SnapshotError> {
         let inner = T::restore_snapshot(snapshot)?;
-        Ok(MuxStateMachine::new(inner))
+        Ok(Self::new(inner))
     }
 }
 
@@ -156,6 +157,8 @@ mod tests {
 
     #[test]
     fn mux_state_machine_snapshot_roundtrip() {
+        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+
         let users: Users = UsersInner::new().into();
         let streams: Streams = StreamsInner::new().into();
         let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
@@ -171,8 +174,6 @@ mod tests {
         assert!(snapshot.streams.is_some());
         assert!(snapshot.consumer_groups.is_some());
 
-        // Restore and verify
-        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
         let restored: MuxStateMachine<MuxTuple> =
             MuxStateMachine::restore_snapshot(&snapshot).unwrap();
 
@@ -189,11 +190,12 @@ mod tests {
         use crate::impls::metadata::IggySnapshot;
         use crate::stm::snapshot::Snapshot;
 
+        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+
         let users: Users = UsersInner::new().into();
         let streams: Streams = StreamsInner::new().into();
         let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
 
-        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
         let mux: MuxStateMachine<MuxTuple> =
             MuxStateMachine::new(variadic!(users, streams, consumer_groups));
 
diff --git a/core/metadata/src/stm/snapshot.rs 
b/core/metadata/src/stm/snapshot.rs
index 76210ceb6..6daadc03a 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -33,8 +33,8 @@ pub enum SnapshotError {
 impl fmt::Display for SnapshotError {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         match self {
-            SnapshotError::Serialize(e) => write!(f, "snapshot serialization 
failed: {}", e),
-            SnapshotError::Deserialize(e) => write!(f, "snapshot 
deserialization failed: {}", e),
+            Self::Serialize(e) => write!(f, "snapshot serialization failed: 
{e}"),
+            Self::Deserialize(e) => write!(f, "snapshot deserialization 
failed: {e}"),
         }
     }
 }
@@ -42,8 +42,8 @@ impl fmt::Display for SnapshotError {
 impl std::error::Error for SnapshotError {
     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
         match self {
-            SnapshotError::Serialize(e) => Some(e),
-            SnapshotError::Deserialize(e) => Some(e),
+            Self::Serialize(e) => Some(e),
+            Self::Deserialize(e) => Some(e),
         }
     }
 }
@@ -75,6 +75,7 @@ impl Default for MetadataSnapshot {
 
 impl MetadataSnapshot {
     /// Create a new snapshot with the given sequence number.
+    #[must_use]
     pub fn new(sequence_number: u64) -> Self {
         Self {
             version: 1,
@@ -87,11 +88,17 @@ impl MetadataSnapshot {
     }
 
     /// Encode the snapshot to msgpack bytes.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError::Serialize` if msgpack serialization fails.
     pub fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
         rmp_serde::to_vec(self).map_err(SnapshotError::Serialize)
     }
 
     /// Decode a snapshot from msgpack bytes.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError::Deserialize` if msgpack deserialization fails.
     pub fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
         rmp_serde::from_slice(bytes).map_err(SnapshotError::Deserialize)
     }
@@ -101,6 +108,7 @@ impl MetadataSnapshot {
 ///
 /// This is the high-level interface that concrete snapshot types (e.g. 
`IggySnapshot`)
 /// must satisfy. It provides methods for creating, encoding, and decoding 
snapshots.
+#[allow(clippy::missing_errors_doc)]
 pub trait Snapshot: Sized {
     /// The error type for snapshot operations.
     type Error: std::error::Error;
@@ -139,6 +147,7 @@ pub trait Snapshot: Sized {
 /// Trait implemented by each `{Name}Inner` state machine to support 
snapshotting.
 /// Each state machine defines its own snapshot
 /// type for serialization and provides conversion methods.
+#[allow(clippy::missing_errors_doc)]
 pub trait Snapshotable {
     /// The serde-serializable snapshot representation of this state.
     /// This should be a plain struct with only serializable types and no 
wrappers
@@ -156,7 +165,8 @@ pub trait Snapshotable {
 
 /// Trait for filling a typed snapshot with state machine data.
 ///
-/// Each state machine implements this to write its serialized state
+/// Each state machine implements this to write its serialized state.
+#[allow(clippy::missing_errors_doc)]
 pub trait FillSnapshot<S> {
     /// Fill the snapshot with this state machine's data.
     fn fill_snapshot(&self, snapshot: &mut S) -> Result<(), SnapshotError>;
@@ -165,6 +175,7 @@ pub trait FillSnapshot<S> {
 /// Trait for restoring state machine data from a typed snapshot.
 ///
 /// Each state machine implements this to read its state.
+#[allow(clippy::missing_errors_doc)]
 pub trait RestoreSnapshot<S>: Sized {
     /// Restore this state machine from the snapshot.
     fn restore_snapshot(snapshot: &S) -> Result<Self, SnapshotError>;
@@ -249,7 +260,7 @@ mod tests {
 
     #[test]
     fn roundtrip_with_data() {
-        let ts = IggyTimestamp::from(1694968446131680u64);
+        let ts = IggyTimestamp::from(1_694_968_446_131_680_u64);
 
         let mut snapshot = MetadataSnapshot::new(100);
         snapshot.streams = Some(StreamsSnapshot {
@@ -295,7 +306,7 @@ mod tests {
         use crate::stm::user::{PermissionerSnapshot, UserSnapshot, 
UsersSnapshot};
         use iggy_common::UserStatus;
 
-        let ts = IggyTimestamp::from(1694968446131680u64);
+        let ts = IggyTimestamp::from(1_694_968_446_131_680_u64);
 
         let users_snap = UsersSnapshot {
             items: vec![
@@ -373,8 +384,8 @@ mod tests {
         let encoded = snapshot.encode().unwrap();
         let decoded = MetadataSnapshot::decode(&encoded).unwrap();
 
-        use crate::stm::user::Users;
-        let restored_users: Users = 
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+        let restored_users: crate::stm::user::Users =
+            RestoreSnapshot::restore_snapshot(&decoded).unwrap();
 
         let mut verify = MetadataSnapshot::new(0);
         restored_users.fill_snapshot(&mut verify).unwrap();
@@ -387,8 +398,8 @@ mod tests {
         assert_eq!(users_snap.items[1].1.username, "charlie");
         assert_eq!(users_snap.items[1].1.id, 2);
 
-        use crate::stm::stream::Streams;
-        let restored_streams: Streams = 
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+        let restored_streams: crate::stm::stream::Streams =
+            RestoreSnapshot::restore_snapshot(&decoded).unwrap();
 
         let mut verify = MetadataSnapshot::new(0);
         restored_streams.fill_snapshot(&mut verify).unwrap();
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 9bc6f4668..df610a6a1 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -51,7 +51,8 @@ pub struct Partition {
 }
 
 impl Partition {
-    pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+    #[must_use]
+    pub const fn new(id: usize, created_at: IggyTimestamp) -> Self {
         Self { id, created_at }
     }
 }
@@ -184,6 +185,7 @@ impl Clone for Stream {
 }
 
 impl Stream {
+    #[must_use]
     pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self {
         Self {
             id: 0,
@@ -195,6 +197,7 @@ impl Stream {
         }
     }
 
+    #[must_use]
     pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats: 
Arc<StreamStats>) -> Self {
         Self {
             id: 0,
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 8e2c7897e..8e0ec5d8e 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -64,7 +64,8 @@ impl Default for User {
 }
 
 impl User {
-    pub fn new(
+    #[must_use]
+    pub const fn new(
         username: Arc<str>,
         password_hash: Arc<str>,
         status: UserStatus,
@@ -126,6 +127,7 @@ impl UsersInner {
 // TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of 
empty Bytes.
 impl StateHandler for CreateUser {
     type State = UsersInner;
+    #[allow(clippy::cast_possible_truncation)]
     fn apply(&self, state: &mut UsersInner) -> Bytes {
         let username_arc: Arc<str> = Arc::from(self.username.as_str());
         if state.index.contains_key(&username_arc) {
@@ -156,6 +158,7 @@ impl StateHandler for CreateUser {
 
 impl StateHandler for UpdateUser {
     type State = UsersInner;
+    #[allow(clippy::cast_possible_truncation)]
     fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return Bytes::new();
@@ -187,6 +190,7 @@ impl StateHandler for UpdateUser {
 
 impl StateHandler for DeleteUser {
     type State = UsersInner;
+    #[allow(clippy::cast_possible_truncation)]
     fn apply(&self, state: &mut UsersInner) -> Bytes {
         let Some(user_id) = state.resolve_user_id(&self.user_id) else {
             return Bytes::new();
@@ -406,6 +410,7 @@ impl Snapshotable for Users {
         })
     }
 
+    #[allow(clippy::cast_possible_truncation)]
     fn from_snapshot(
         snapshot: Self::Snapshot,
     ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index c56af72b1..c355ea509 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -40,4 +40,4 @@ tracing = { workspace = true }
 [lints.clippy]
 enum_glob_use = "deny"
 pedantic = "deny"
-nursery = "deny"
+nursery = "warn"
diff --git a/core/shard/Cargo.toml b/core/shard/Cargo.toml
index 5180ba829..387f5f7ef 100644
--- a/core/shard/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -33,3 +33,8 @@ metadata = { path = "../metadata" }
 papaya = { workspace = true }
 partitions = { path = "../partitions" }
 tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 2b8b3138b..e41e210a6 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -44,6 +44,7 @@ pub type Sender<T> = 
crossfire::MTx<crossfire::mpsc::Array<T>>;
 pub type Receiver<T> = crossfire::AsyncRx<crossfire::mpsc::Array<T>>;
 
 /// Create a bounded mpsc channel with a blocking sender and async receiver.
+#[must_use]
 pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) 
{
     crossfire::mpsc::bounded_blocking_async(capacity)
 }
@@ -64,7 +65,8 @@ pub struct ShardFrame<R: Send + 'static = ()> {
 
 impl<R: Send + 'static> ShardFrame<R> {
     /// Create a fire-and-forget frame (no caller waiting for completion).
-    pub fn fire_and_forget(message: Message<GenericHeader>) -> Self {
+    #[must_use]
+    pub const fn fire_and_forget(message: Message<GenericHeader>) -> Self {
         Self {
             message,
             response_sender: None,
@@ -116,7 +118,8 @@ where
     /// * `senders` - one sender per shard in the cluster (indexed by shard 
id).
     /// * `inbox` - the receiver that this shard drains in its message pump.
     /// * `shards_table` - namespace -> shard routing table.
-    pub fn new(
+    #[must_use]
+    pub const fn new(
         id: u16,
         name: String,
         metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
@@ -136,7 +139,8 @@ where
         }
     }
 
-    pub fn shards_table(&self) -> &T {
+    #[must_use]
+    pub const fn shards_table(&self) -> &T {
         &self.shards_table
     }
 }
@@ -151,6 +155,7 @@ where
     ///
     /// Routes requests, replication messages, and acks to either the metadata
     /// plane or the partitions plane based on `PlaneIdentity::is_applicable`.
+    #[allow(clippy::future_not_send)]
     pub async fn on_message(&self, message: Message<GenericHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -173,6 +178,7 @@ where
         }
     }
 
+    #[allow(clippy::future_not_send)]
     pub async fn on_request(&self, request: Message<RequestHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -191,6 +197,7 @@ where
         self.plane.on_request(request).await;
     }
 
+    #[allow(clippy::future_not_send)]
     pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -209,6 +216,7 @@ where
         self.plane.on_replicate(prepare).await;
     }
 
+    #[allow(clippy::future_not_send)]
     pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -235,6 +243,10 @@ where
     /// Invariant: planes do not produce loopback messages for each other.
     /// `on_ack` commits and applies but never calls `push_loopback`, so
     /// draining metadata before partitions is order-independent.
+    ///
+    /// # Panics
+    /// Panics if a loopback message is not a valid `PrepareOk` message.
+    #[allow(clippy::future_not_send)]
     pub async fn process_loopback(&self, buf: &mut 
Vec<Message<GenericHeader>>) -> usize
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 8102fef00..0a68b98bf 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -41,7 +41,7 @@ where
     /// the correct shard's message pump.
     ///
     /// Decomposes the generic message into its typed form (Request, Prepare,
-    /// or PrepareOk) to access the operation and namespace, then resolves
+    /// or `PrepareOk`) to access the operation and namespace, then resolves
     /// the target shard and enqueues the message via its channel sender.
     pub fn dispatch(&self, message: Message<GenericHeader>) {
         let (operation, namespace, generic) = match MessageBag::from(message) {
@@ -127,6 +127,7 @@ where
     }
 
     /// Drain this shard's inbox and process each frame locally.
+    #[allow(clippy::future_not_send)]
     pub async fn run_message_pump(&self, stop: Receiver<()>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
@@ -160,6 +161,7 @@ where
         }
     }
 
+    #[allow(clippy::future_not_send)]
     async fn process_frame(&self, frame: ShardFrame<R>)
     where
         B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
diff --git a/core/shard/src/shards_table.rs b/core/shard/src/shards_table.rs
index 719f52e10..47834a8f6 100644
--- a/core/shard/src/shards_table.rs
+++ b/core/shard/src/shards_table.rs
@@ -22,8 +22,8 @@ use std::hash::Hasher as _;
 /// Lookup table that maps partition namespaces to their owning shard.
 ///
 /// Implementations can be:
-/// - A shared concurrent map (DashMap, papaya, etc.) referenced by all shards.
-/// - A per-shard local `HashMap` replica, updated via
+/// - A shared concurrent map (`DashMap`, papaya, etc.) referenced by all 
shards.
+/// - A per-shard local `HashMap` replica, updated via a
 ///   broadcast when partitions are created, deleted, or moved.
 pub trait ShardsTable {
     /// Returns the shard id that owns `namespace`, or `None` if the
@@ -51,12 +51,14 @@ impl Default for PapayaShardsTable {
 }
 
 impl PapayaShardsTable {
+    #[must_use]
     pub fn new() -> Self {
         Self {
             inner: papaya::HashMap::new(),
         }
     }
 
+    #[must_use]
     pub fn with_capacity(capacity: usize) -> Self {
         Self {
             inner: papaya::HashMap::with_capacity(capacity),
@@ -85,6 +87,7 @@ impl ShardsTable for PapayaShardsTable {
 /// Given a packed `IggyNamespace` and the total number of shards, returns the
 /// shard id that should own the partition.  The upper bits of the Murmur3 hash
 /// are used to avoid the weak lower bits for small integer inputs.
+#[must_use]
 pub fn calculate_shard_assignment(ns: &IggyNamespace, shard_count: u32) -> u16 
{
     let mut hasher = Murmur3Hasher::default();
     hasher.write_u64(ns.inner());


Reply via email to