This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 69188127e chore(simulator): enable pedantic and nursery clippy lints 
(#2895)
69188127e is described below

commit 69188127e0f95c314b1a8ab149f1b7373361f2ab
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 11:18:03 2026 +0100

    chore(simulator): enable pedantic and nursery clippy lints (#2895)
---
 core/simulator/Cargo.toml         |  5 ++++
 core/simulator/src/bus.rs         | 55 ++++++++++++++++++---------------------
 core/simulator/src/client.rs      | 30 +++++++++++----------
 core/simulator/src/deps.rs        |  5 +++-
 core/simulator/src/lib.rs         | 14 +++++++---
 core/simulator/src/main.rs        | 13 +++++----
 core/simulator/src/network.rs     | 12 ++++++---
 core/simulator/src/packet.rs      | 40 ++++++++++++++++++++--------
 core/simulator/src/ready_queue.rs | 14 +++++++---
 core/simulator/src/replica.rs     | 18 +++++++++----
 10 files changed, 128 insertions(+), 78 deletions(-)

diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 1051c77b5..559f5fae9 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -35,3 +35,8 @@ rand = { workspace = true }
 rand_xoshiro = { workspace = true }
 shard = { path = "../shard" }
 tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "warn"
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index b4dc1e52a..371258aef 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -17,7 +17,7 @@
 
 use iggy_common::{IggyError, header::GenericHeader, message::Message};
 use message_bus::MessageBus;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashSet, VecDeque};
 use std::ops::Deref;
 use std::sync::{Arc, Mutex};
 
@@ -36,21 +36,25 @@ pub struct Envelope {
 // I think the way we could handle that is by having an dedicated collection 
for client responses (clients_table).
 #[derive(Debug, Default)]
 pub struct MemBus {
-    clients: Mutex<HashMap<u128, ()>>,
-    replicas: Mutex<HashMap<u8, ()>>,
+    clients: Mutex<HashSet<u128>>,
+    replicas: Mutex<HashSet<u8>>,
     pending_messages: Mutex<VecDeque<Envelope>>,
 }
 
 impl MemBus {
+    #[must_use]
     pub fn new() -> Self {
         Self {
-            clients: Mutex::new(HashMap::new()),
-            replicas: Mutex::new(HashMap::new()),
+            clients: Mutex::new(HashSet::new()),
+            replicas: Mutex::new(HashSet::new()),
             pending_messages: Mutex::new(VecDeque::new()),
         }
     }
 
     /// Get the next pending message from the bus
+    ///
+    /// # Panics
+    /// Panics if the internal mutex is poisoned.
     pub fn receive(&self) -> Option<Envelope> {
         self.pending_messages.lock().unwrap().pop_front()
     }
@@ -63,27 +67,27 @@ impl MessageBus for MemBus {
     type Sender = ();
 
     fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> 
bool {
-        if self.clients.lock().unwrap().contains_key(&client) {
+        if self.clients.lock().unwrap().contains(&client) {
             return false;
         }
-        self.clients.lock().unwrap().insert(client, ());
+        self.clients.lock().unwrap().insert(client);
         true
     }
 
     fn remove_client(&mut self, client: Self::Client) -> bool {
-        self.clients.lock().unwrap().remove(&client).is_some()
+        self.clients.lock().unwrap().remove(&client)
     }
 
     fn add_replica(&mut self, replica: Self::Replica) -> bool {
-        if self.replicas.lock().unwrap().contains_key(&replica) {
+        if self.replicas.lock().unwrap().contains(&replica) {
             return false;
         }
-        self.replicas.lock().unwrap().insert(replica, ());
+        self.replicas.lock().unwrap().insert(replica);
         true
     }
 
     fn remove_replica(&mut self, replica: Self::Replica) -> bool {
-        self.replicas.lock().unwrap().remove(&replica).is_some()
+        self.replicas.lock().unwrap().remove(&replica)
     }
 
     async fn send_to_client(
@@ -91,7 +95,8 @@ impl MessageBus for MemBus {
         client_id: Self::Client,
         message: Self::Data,
     ) -> Result<(), IggyError> {
-        if !self.clients.lock().unwrap().contains_key(&client_id) {
+        if !self.clients.lock().unwrap().contains(&client_id) {
+            #[allow(clippy::cast_possible_truncation)]
             return Err(IggyError::ClientNotFound(client_id as u32));
         }
 
@@ -110,8 +115,8 @@ impl MessageBus for MemBus {
         replica: Self::Replica,
         message: Self::Data,
     ) -> Result<(), IggyError> {
-        if !self.replicas.lock().unwrap().contains_key(&replica) {
-            return Err(IggyError::ResourceNotFound(format!("Replica {}", 
replica)));
+        if !self.replicas.lock().unwrap().contains(&replica) {
+            return Err(IggyError::ResourceNotFound(format!("Replica 
{replica}")));
         }
 
         self.pending_messages.lock().unwrap().push_back(Envelope {
@@ -125,7 +130,7 @@ impl MessageBus for MemBus {
     }
 }
 
-/// Newtype wrapper for shared MemBus that implements MessageBus
+/// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
 #[derive(Debug, Clone)]
 pub struct SharedMemBus(pub Arc<MemBus>);
 
@@ -142,30 +147,20 @@ impl MessageBus for SharedMemBus {
     type Data = Message<GenericHeader>;
     type Sender = ();
 
-    fn add_client(&mut self, client: Self::Client, sender: Self::Sender) -> 
bool {
-        self.0
-            .clients
-            .lock()
-            .unwrap()
-            .insert(client, sender)
-            .is_none()
+    fn add_client(&mut self, client: Self::Client, _sender: Self::Sender) -> 
bool {
+        self.0.clients.lock().unwrap().insert(client)
     }
 
     fn remove_client(&mut self, client: Self::Client) -> bool {
-        self.0.clients.lock().unwrap().remove(&client).is_some()
+        self.0.clients.lock().unwrap().remove(&client)
     }
 
     fn add_replica(&mut self, replica: Self::Replica) -> bool {
-        self.0
-            .replicas
-            .lock()
-            .unwrap()
-            .insert(replica, ())
-            .is_none()
+        self.0.replicas.lock().unwrap().insert(replica)
     }
 
     fn remove_replica(&mut self, replica: Self::Replica) -> bool {
-        self.0.replicas.lock().unwrap().remove(&replica).is_some()
+        self.0.replicas.lock().unwrap().remove(&replica)
     }
 
     async fn send_to_client(
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index e254cb022..7b55af548 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -32,7 +32,8 @@ pub struct SimClient {
 }
 
 impl SimClient {
-    pub fn new(client_id: u128) -> Self {
+    #[must_use]
+    pub const fn new(client_id: u128) -> Self {
         Self {
             client_id,
             request_counter: Cell::new(0),
@@ -51,22 +52,25 @@ impl SimClient {
         };
         let payload = create_stream.to_bytes();
 
-        self.build_request(Operation::CreateStream, payload)
+        self.build_request(Operation::CreateStream, &payload)
     }
 
+    /// # Panics
+    /// Panics if the stream name cannot be converted to an `Identifier`.
     pub fn delete_stream(&self, name: &str) -> Message<RequestHeader> {
         let delete_stream = DeleteStream {
             stream_id: Identifier::named(name).unwrap(),
         };
         let payload = delete_stream.to_bytes();
 
-        self.build_request(Operation::DeleteStream, payload)
+        self.build_request(Operation::DeleteStream, &payload)
     }
 
+    #[allow(clippy::cast_possible_truncation)]
     pub fn send_messages(
         &self,
         namespace: IggyNamespace,
-        messages: Vec<&[u8]>,
+        messages: &[&[u8]],
     ) -> Message<RequestHeader> {
         // Build batch: count | indexes | messages
         let count = messages.len() as u32;
@@ -74,7 +78,7 @@ impl SimClient {
         let mut messages_buf = Vec::new();
 
         let mut current_position = 0u32;
-        for msg in &messages {
+        for msg in messages {
             // Write index: position (u32) + length (u32)
             indexes.extend_from_slice(&current_position.to_le_bytes());
             indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());
@@ -90,17 +94,14 @@ impl SimClient {
         payload.extend_from_slice(&indexes);
         payload.extend_from_slice(&messages_buf);
 
-        self.build_request_with_namespace(
-            Operation::SendMessages,
-            bytes::Bytes::from(payload),
-            namespace,
-        )
+        self.build_request_with_namespace(Operation::SendMessages, &payload, 
namespace)
     }
 
+    #[allow(clippy::cast_possible_truncation)]
     fn build_request_with_namespace(
         &self,
         operation: Operation,
-        payload: bytes::Bytes,
+        payload: &[u8],
         namespace: IggyNamespace,
     ) -> Message<RequestHeader> {
         use bytes::Bytes;
@@ -130,13 +131,14 @@ impl SimClient {
         let header_bytes = bytemuck::bytes_of(&header);
         let mut buffer = Vec::with_capacity(total_size);
         buffer.extend_from_slice(header_bytes);
-        buffer.extend_from_slice(&payload);
+        buffer.extend_from_slice(payload);
 
         Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
             .expect("failed to build request message")
     }
 
-    fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> 
Message<RequestHeader> {
+    #[allow(clippy::cast_possible_truncation)]
+    fn build_request(&self, operation: Operation, payload: &[u8]) -> 
Message<RequestHeader> {
         use bytes::Bytes;
 
         let header_size = std::mem::size_of::<RequestHeader>();
@@ -163,7 +165,7 @@ impl SimClient {
         let header_bytes = bytemuck::bytes_of(&header);
         let mut buffer = Vec::with_capacity(total_size);
         buffer.extend_from_slice(header_bytes);
-        buffer.extend_from_slice(&payload);
+        buffer.extend_from_slice(payload);
 
         Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
             .expect("failed to build request message")
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 622704820..12e78bb05 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -34,6 +34,7 @@ pub struct MemStorage {
     offset: Cell<u64>,
 }
 
+#[allow(clippy::future_not_send)]
 impl Storage for MemStorage {
     type Buffer = Vec<u8>;
 
@@ -74,6 +75,7 @@ impl<S: Storage + Default> Default for SimJournal<S> {
     }
 }
 
+#[allow(clippy::missing_fields_in_debug)]
 impl<S: Storage> std::fmt::Debug for SimJournal<S> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("SimJournal")
@@ -85,6 +87,7 @@ impl<S: Storage> std::fmt::Debug for SimJournal<S> {
     }
 }
 
+#[allow(clippy::future_not_send)]
 impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> {
     type Header = PrepareHeader;
     type Entry = Message<PrepareHeader>;
@@ -137,7 +140,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
 
 impl JournalHandle for SimJournal<MemStorage> {
     type Storage = MemStorage;
-    type Target = SimJournal<MemStorage>;
+    type Target = Self;
 
     fn handle(&self) -> &Self::Target {
         self
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 260b76488..f3cf63ea3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -43,6 +43,7 @@ impl Simulator {
         }
     }
 
+    #[allow(clippy::cast_possible_truncation)]
     pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> 
Self {
         let mut message_bus = MemBus::new();
         for client in clients {
@@ -58,8 +59,8 @@ impl Simulator {
             .map(|i| {
                 new_replica(
                     i as u8,
-                    format!("replica-{}", i),
-                    Arc::clone(&message_bus),
+                    format!("replica-{i}"),
+                    &message_bus,
                     replica_count as u8,
                 )
             })
@@ -71,6 +72,7 @@ impl Simulator {
         }
     }
 
+    #[allow(clippy::cast_possible_truncation)]
     pub fn with_message_bus(replica_count: usize, mut message_bus: MemBus) -> 
Self {
         for i in 0..replica_count as u8 {
             message_bus.add_replica(i);
@@ -81,8 +83,8 @@ impl Simulator {
             .map(|i| {
                 new_replica(
                     i as u8,
-                    format!("replica-{}", i),
-                    Arc::clone(&message_bus),
+                    format!("replica-{i}"),
+                    &message_bus,
                     replica_count as u8,
                 )
             })
@@ -96,6 +98,9 @@ impl Simulator {
 }
 
 impl Simulator {
+    /// # Panics
+    /// Panics if a client response message has an invalid command type.
+    #[allow(clippy::future_not_send)]
     pub async fn step(&self) -> Option<Message<ReplyHeader>> {
         if let Some(envelope) = self.message_bus.receive() {
             if let Some(_client_id) = envelope.to_client {
@@ -116,6 +121,7 @@ impl Simulator {
         None
     }
 
+    #[allow(clippy::future_not_send)]
     async fn dispatch_to_replica(
         &self,
         replica: &Replica,
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 8d519cdda..de2288273 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -49,7 +49,7 @@ fn main() {
     let test_namespace = IggyNamespace::new(1, 1, 0);
 
     // Initialize partition on all replicas
-    println!("[sim] Initializing test partition: {:?}", test_namespace);
+    println!("[sim] Initializing test partition: {test_namespace:?}");
     sim.init_partition(test_namespace);
 
     // Responses queue
@@ -70,13 +70,14 @@ fn main() {
                 b"Message 3".as_slice(),
             ];
 
-            let send_msg = client.send_messages(test_namespace, test_messages);
+            let send_msg = client.send_messages(test_namespace, 
&test_messages);
             bus.send_to_replica(leader, send_msg.into_generic())
                 .await
                 .expect("failed to send messages");
 
             loop {
-                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                let reply = responses_clone.lock().unwrap().pop();
+                if let Some(reply) = reply {
                     println!("[client] Got send_messages reply: {:?}", 
reply.header());
                     break;
                 }
@@ -90,7 +91,8 @@ fn main() {
                 .expect("failed to send create_stream");
 
             loop {
-                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                let reply = responses_clone.lock().unwrap().pop();
+                if let Some(reply) = reply {
                     println!("[client] Got create_stream reply: {:?}", 
reply.header());
                     break;
                 }
@@ -103,7 +105,8 @@ fn main() {
                 .expect("failed to send delete_stream");
 
             loop {
-                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                let reply = responses_clone.lock().unwrap().pop();
+                if let Some(reply) = reply {
                     println!("[client] Got delete_stream reply: {:?}", 
reply.header());
                     break;
                 }
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
index 91cbed788..d5b7b7b61 100644
--- a/core/simulator/src/network.rs
+++ b/core/simulator/src/network.rs
@@ -18,7 +18,7 @@
 //! Network abstraction layer for the cluster simulator.
 //!
 //! **Note:** Currently a thin passthrough over `PacketSimulator`. Once the
-//! Cluster and MessageBus layers are built, this will own
+//! Cluster and [`MessageBus`] layers are built, this will own
 //! process-to-bus routing, and node enable/disable logic.
 
 use crate::packet::{
@@ -40,6 +40,7 @@ pub struct Network {
 
 impl Network {
     /// Create a new network.
+    #[must_use]
     pub fn new(options: PacketSimulatorOptions) -> Self {
         Self {
             simulator: PacketSimulator::new(options),
@@ -50,8 +51,8 @@ impl Network {
     ///
     /// The message will be queued with a simulated delay and may be:
     /// - Delivered normally after the delay
-    /// - Dropped (based on packet_loss_probability)
-    /// - Replayed/duplicated (based on replay_probability)
+    /// - Dropped (based on `packet_loss_probability`)
+    /// - Replayed/duplicated (based on `replay_probability`)
     pub fn submit(&mut self, from: ProcessId, to: ProcessId, message: 
Message<GenericHeader>) {
         self.simulator.submit(from, to, message);
     }
@@ -80,7 +81,8 @@ impl Network {
     }
 
     /// Get the current network tick.
-    pub fn current_tick(&self) -> u64 {
+    #[must_use]
+    pub const fn current_tick(&self) -> u64 {
         self.simulator.current_tick()
     }
 
@@ -105,6 +107,7 @@ impl Network {
     }
 
     /// Check whether a specific link is enabled (filter is not empty).
+    #[must_use]
     pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
         self.simulator.is_link_enabled(from, to)
     }
@@ -144,6 +147,7 @@ impl Network {
     }
 
     /// Get the number of packets currently in flight.
+    #[must_use]
     pub fn packets_in_flight(&self) -> usize {
         self.simulator.packets_in_flight()
     }
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
index 3981ad7bd..59317adcc 100644
--- a/core/simulator/src/packet.rs
+++ b/core/simulator/src/packet.rs
@@ -142,11 +142,11 @@ impl Default for PacketSimulatorOptions {
     }
 }
 
-/// Per-path link: holds packets in a ReadyQueue sorted by ready_at.
+/// Per-path link: holds packets in a [`ReadyQueue`] sorted by `ready_at`.
 struct Link {
-    /// Packets waiting to be delivered, ordered by ready_at (min-heap).
+    /// Packets waiting to be delivered, ordered by `ready_at` (min-heap).
     packets: ReadyQueue<Packet>,
-    /// Tick until which this link is clogged. Clogged when clogged_till > 
current_tick.
+    /// Tick until which this link is clogged. Clogged when `clogged_till` > 
`current_tick`.
     clogged_till: u64,
     /// Per-command filter controlling which commands pass through this link.
     /// [`ALLOW_ALL`] = fully enabled (default), [`BLOCK_ALL`] = fully 
disabled.
@@ -156,6 +156,7 @@ struct Link {
     drop_packet_fn: Option<fn(&Packet) -> bool>,
 }
 
+#[allow(clippy::missing_fields_in_debug)]
 impl std::fmt::Debug for Link {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("Link")
@@ -206,11 +207,11 @@ pub enum PartitionSymmetry {
 /// The packet simulator manages a matrix of links between processes.
 pub struct PacketSimulator {
     options: PacketSimulatorOptions,
-    /// Flat array of links. Index = from_idx * max_processes + to_idx.
+    /// Flat array of links. Index = `from_idx` * `max_processes` + `to_idx`.
     links: Vec<Link>,
     /// Maximum number of processes (determines link array size).
     max_processes: usize,
-    /// Mapping from ProcessId to flat index.
+    /// Mapping from [`ProcessId`] to flat index.
     process_indices: HashMap<ProcessId, usize>,
     /// Next flat index to assign to a newly registered client.
     /// Initialized to `replica_count` and incremented on each new 
registration.
@@ -225,7 +226,7 @@ pub struct PacketSimulator {
     auto_partition: Vec<bool>,
     /// Countdown timer for partition/unpartition stability.
     auto_partition_stability: u32,
-    /// Scratch buffer for Fisher-Yates shuffle in UniformSize partition mode.
+    /// Scratch buffer for Fisher-Yates shuffle in [`UniformSize`] partition 
mode.
     auto_partition_nodes: Vec<usize>,
     /// Reusable buffer for delivered packets.
     delivered: Vec<Packet>,
@@ -233,11 +234,16 @@ pub struct PacketSimulator {
 
 impl PacketSimulator {
     /// Create a new packet simulator.
+    ///
+    /// # Panics
+    /// Panics if `node_count` is 0, or if delay/probability parameters are 
invalid.
+    #[must_use]
+    #[allow(clippy::cast_possible_truncation)]
     pub fn new(options: PacketSimulatorOptions) -> Self {
         let node_count = options.node_count as usize;
         let client_count = options.client_count as usize;
 
-        assert!(node_count > 0, "node_count must be > 0, got {}", node_count);
+        assert!(node_count > 0, "node_count must be > 0, got {node_count}");
         assert!(
             options.one_way_delay_min >= 1,
             "one_way_delay_min must be >= 1 (got {}), zero causes unbounded 
replay loops",
@@ -310,6 +316,9 @@ impl PacketSimulator {
 
     /// Register a client process. Returns its flat index.
     /// Re-registering a client with the same ID returns the same index.
+    ///
+    /// # Panics
+    /// Panics if the maximum number of processes has been reached.
     pub fn register_client(&mut self, client_id: u128) -> usize {
         if let Some(&idx) = 
self.process_indices.get(&ProcessId::Client(client_id)) {
             return idx;
@@ -327,7 +336,7 @@ impl PacketSimulator {
     }
 
     /// Resolve a `ProcessId` to its flat index.
-    /// Fast path for replicas (direct arithmetic), HashMap fallback for 
clients.
+    /// Fast path for replicas (direct arithmetic), `HashMap` fallback for 
clients.
     fn process_index(&self, id: ProcessId) -> Option<usize> {
         match id {
             ProcessId::Replica(i) => {
@@ -391,6 +400,11 @@ impl PacketSimulator {
 
     /// Generate an exponentially distributed random value with the given mean.
     /// Uses inverse CDF: -mean * ln(U) where U ~ Uniform(0,1).
+    #[allow(
+        clippy::cast_precision_loss,
+        clippy::cast_sign_loss,
+        clippy::cast_possible_truncation
+    )]
     fn random_exponential(prng: &mut Xoshiro256Plus, mean: u64) -> u64 {
         let u: f64 = prng.random::<f64>();
         if u > 0.0 {
@@ -402,7 +416,7 @@ impl PacketSimulator {
     }
 
     /// Number of registered processes.
-    fn process_count(&self) -> usize {
+    const fn process_count(&self) -> usize {
         self.next_index
     }
 
@@ -417,6 +431,7 @@ impl PacketSimulator {
     }
 
     /// Check whether a link is currently enabled (filter is not empty).
+    #[must_use]
     pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
         let idx = self.link_index(from, to);
         !self.links[idx].filter.is_empty()
@@ -656,7 +671,8 @@ impl PacketSimulator {
     }
 
     /// Get the current tick.
-    pub fn current_tick(&self) -> u64 {
+    #[must_use]
+    pub const fn current_tick(&self) -> u64 {
         self.current_tick
     }
 
@@ -695,11 +711,13 @@ impl PacketSimulator {
     }
 
     /// Get the number of packets in flight across all links.
+    #[must_use]
     pub fn packets_in_flight(&self) -> usize {
         self.links.iter().map(|l| l.packets.len()).sum()
     }
 }
 
+#[allow(clippy::missing_fields_in_debug)]
 impl std::fmt::Debug for PacketSimulator {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("PacketSimulator")
@@ -830,7 +848,7 @@ mod tests {
         sim.clog(ProcessId::Replica(0), ProcessId::Replica(1));
 
         // Submit a packet
-        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg.clone());
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg);
         for _ in 0..20 {
             sim.tick();
         }
diff --git a/core/simulator/src/ready_queue.rs 
b/core/simulator/src/ready_queue.rs
index 0bd91c156..2bb79107b 100644
--- a/core/simulator/src/ready_queue.rs
+++ b/core/simulator/src/ready_queue.rs
@@ -42,10 +42,12 @@ impl<T: Ready> Default for ReadyQueue<T> {
 }
 
 impl<T: Ready> ReadyQueue<T> {
-    pub fn new() -> Self {
+    #[must_use]
+    pub const fn new() -> Self {
         Self { items: Vec::new() }
     }
 
+    #[must_use]
     pub fn with_capacity(capacity: usize) -> Self {
         Self {
             items: Vec::with_capacity(capacity),
@@ -60,12 +62,13 @@ impl<T: Ready> ReadyQueue<T> {
     }
 
     /// Peek at the item with the smallest `ready_at`.
+    #[must_use]
     pub fn peek(&self) -> Option<&T> {
         self.items.first()
     }
 
     /// Reset the queue, removing all items but retaining the allocation.
-    /// Matches TigerBeetle's `reset()` which sets `items.len = 0`.
+    /// Matches `TigerBeetle`'s `reset()` which sets `items.len = 0`.
     pub fn clear(&mut self) {
         self.items.clear();
     }
@@ -120,16 +123,19 @@ impl<T: Ready> ReadyQueue<T> {
     }
 
     /// Number of items in the queue.
-    pub fn len(&self) -> usize {
+    #[must_use]
+    pub const fn len(&self) -> usize {
         self.items.len()
     }
 
     /// Whether the queue is empty.
-    pub fn is_empty(&self) -> bool {
+    #[must_use]
+    pub const fn is_empty(&self) -> bool {
         self.items.is_empty()
     }
 
     /// Access all items in unspecified order.
+    #[must_use]
     pub fn as_slice(&self) -> &[T] {
         &self.items
     }
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ddd423f11..5711da0ce 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -36,7 +36,7 @@ const CLUSTER_ID: u128 = 1;
 pub type Replica =
     shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
 
-pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) 
-> Replica {
+pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, replica_count: u8) 
-> Replica {
     let users: Users = UsersInner::new().into();
     let streams: Streams = StreamsInner::new().into();
     let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
@@ -48,7 +48,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, 
replica_count: u8) ->
         id,
         replica_count,
         0,
-        SharedMemBus(Arc::clone(&bus)),
+        SharedMemBus(Arc::clone(bus)),
         LocalPipeline::new(),
     );
     metadata_consensus.init();
@@ -67,7 +67,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, 
replica_count: u8) ->
         segment_size: IggyByteSize::from(1024 * 1024 * 1024),
     };
 
-    let mut partitions = IggyPartitions::new(ShardId::new(id as u16), 
partitions_config);
+    let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)), 
partitions_config);
 
     // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
     // routes by Operation type, but a shared view change bus would produce 
namespace collisions.
@@ -76,7 +76,7 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, 
replica_count: u8) ->
         id,
         replica_count,
         0,
-        SharedMemBus(Arc::clone(&bus)),
+        SharedMemBus(Arc::clone(bus)),
         NamespacedPipeline::new(),
     );
     partition_consensus.init();
@@ -86,5 +86,13 @@ pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, 
replica_count: u8) ->
     // but this is not possible with crossfire without mangling types due to 
Flavor trait in crossfire.
     // This needs to be revisited in the future.
     let (_tx, inbox) = shard::channel(1024);
-    shard::IggyShard::new(id as u16, name, metadata, partitions, Vec::new(), 
inbox, ())
+    shard::IggyShard::new(
+        u16::from(id),
+        name,
+        metadata,
+        partitions,
+        Vec::new(),
+        inbox,
+        (),
+    )
 }

Reply via email to