This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new bf460c3ae chore(partitions): enable pedantic and nursery clippy lints 
(#2876)
bf460c3ae is described below

commit bf460c3aeb66279ec781219a2ef9118a693804d2
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 10:23:17 2026 +0100

    chore(partitions): enable pedantic and nursery clippy lints (#2876)
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 core/partitions/Cargo.toml             |  5 ++++
 core/partitions/src/iggy_partition.rs  | 13 ++++----
 core/partitions/src/iggy_partitions.rs | 55 ++++++++++++++++++++--------------
 core/partitions/src/journal.rs         |  4 +--
 core/partitions/src/lib.rs             |  4 +++
 core/partitions/src/log.rs             | 22 +++++++-------
 core/partitions/src/types.rs           | 38 +++++++++++++----------
 7 files changed, 81 insertions(+), 60 deletions(-)

diff --git a/core/partitions/Cargo.toml b/core/partitions/Cargo.toml
index d7fadd93b..c56af72b1 100644
--- a/core/partitions/Cargo.toml
+++ b/core/partitions/Cargo.toml
@@ -36,3 +36,8 @@ message_bus = { workspace = true }
 ringbuffer = { workspace = true }
 tokio = { workspace = true }
 tracing = { workspace = true }
+
+[lints.clippy]
+enum_glob_use = "deny"
+pedantic = "deny"
+nursery = "deny"
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 47e71dea2..49c820230 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -91,24 +91,21 @@ impl Partition for IggyPartition {
         let last_dirty_offset = if batch_messages_count == 0 {
             dirty_offset
         } else {
-            dirty_offset + batch_messages_count as u64 - 1
+            dirty_offset + u64::from(batch_messages_count) - 1
         };
 
-        if self.should_increment_offset {
-            self.dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
-        } else {
+        if !self.should_increment_offset {
             self.should_increment_offset = true;
-            self.dirty_offset
-                .store(last_dirty_offset, Ordering::Relaxed);
         }
+        self.dirty_offset
+            .store(last_dirty_offset, Ordering::Relaxed);
 
         let segment_index = self.log.segments().len() - 1;
         self.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
 
         let journal = self.log.journal_mut();
         journal.info.messages_count += batch_messages_count;
-        journal.info.size += IggyByteSize::from(batch_messages_size as u64);
+        journal.info.size += 
IggyByteSize::from(u64::from(batch_messages_size));
         journal.info.current_offset = last_dirty_offset;
         if let Some(ts) = batch.first_timestamp()
             && journal.info.first_timestamp == 0
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 6f94da3bc..c3b1e0bc6 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -19,6 +19,7 @@
 
 use crate::IggyPartition;
 use crate::Partition;
+use crate::log::JournalInfo;
 use crate::types::PartitionsConfig;
 use consensus::PlaneIdentity;
 use consensus::{
@@ -49,14 +50,14 @@ use tracing::{debug, warn};
 /// This struct manages ALL partitions assigned to a single shard, regardless
 /// of which stream/topic they belong to.
 ///
-/// Note: The partition_id within IggyNamespace may NOT equal the Vec index.
-/// For example, shard 0 might have partition_ids [0, 2, 4] while shard 1
-/// has partition_ids [1, 3, 5]. The `LocalIdx` provides the actual index
+/// Note: The `partition_id` within `IggyNamespace` may NOT equal the Vec 
index.
+/// For example, shard 0 might have `partition_ids` [0, 2, 4] while shard 1
+/// has `partition_ids` [1, 3, 5]. The `LocalIdx` provides the actual index
 /// into the `partitions` Vec.
 pub struct IggyPartitions<C> {
     shard_id: ShardId,
     config: PartitionsConfig,
-    /// Collection of partitions, the index of each partition isn't it's ID, 
but rather an local index (LocalIdx) which is used for lookups.
+    /// Collection of partitions, the index of each partition isn't it's ID, 
but rather an local index (`LocalIdx`) which is used for lookups.
     ///
     /// Wrapped in `UnsafeCell` for interior mutability — matches the 
single-threaded
     /// per-shard execution model. Consensus trait methods take `&self` but 
need to
@@ -67,6 +68,7 @@ pub struct IggyPartitions<C> {
 }
 
 impl<C> IggyPartitions<C> {
+    #[must_use]
     pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
         Self {
             shard_id,
@@ -77,6 +79,7 @@ impl<C> IggyPartitions<C> {
         }
     }
 
+    #[must_use]
     pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig, 
capacity: usize) -> Self {
         Self {
             shard_id,
@@ -87,7 +90,7 @@ impl<C> IggyPartitions<C> {
         }
     }
 
-    pub fn config(&self) -> &PartitionsConfig {
+    pub const fn config(&self) -> &PartitionsConfig {
         &self.config
     }
 
@@ -103,7 +106,7 @@ impl<C> IggyPartitions<C> {
         unsafe { &mut *self.partitions.get() }
     }
 
-    pub fn shard_id(&self) -> ShardId {
+    pub const fn shard_id(&self) -> ShardId {
         self.shard_id
     }
 
@@ -172,7 +175,7 @@ impl<C> IggyPartitions<C> {
         // If we swapped an element, update its index in the map
         if idx < partitions.len() {
             // Find the namespace that was at the last position (now at idx)
-            for (_ns, lidx) in self.namespace_to_local.iter_mut() {
+            for lidx in self.namespace_to_local.values_mut() {
                 if **lidx == partitions.len() {
                     *lidx = LocalIdx::new(idx);
                     break;
@@ -230,7 +233,7 @@ impl<C> IggyPartitions<C> {
         &mut self.partitions_mut()[idx]
     }
 
-    pub fn consensus(&self) -> Option<&C> {
+    pub const fn consensus(&self) -> Option<&C> {
         self.consensus.as_ref()
     }
 
@@ -258,7 +261,7 @@ impl<C> IggyPartitions<C> {
 
         // Create partition with initialized log
         let stats = Arc::new(PartitionStats::default());
-        let mut partition = IggyPartition::new(stats.clone());
+        let mut partition = IggyPartition::new(stats);
         partition.log.add_persisted_segment(segment, storage);
         partition.offset.store(start_offset, Ordering::Relaxed);
         partition
@@ -276,12 +279,15 @@ impl<C> IggyPartitions<C> {
     /// initial segment, and storage. Skips the control plane metadata 
broadcasting.
     ///
     /// Corresponds to the "INITIATE PARTITION" phase in the server's flow:
-    /// 1. Control plane: create PartitionMeta (SKIPPED in this method)
+    /// 1. Control plane: create `PartitionMeta` (SKIPPED in this method)
     /// 2. Control plane: broadcast to shards (SKIPPED in this method)
     /// 3. Data plane: INITIATE PARTITION (THIS METHOD)
     ///
     /// Idempotent: subsequent calls for the same namespace are no-ops.
     /// Consensus must be set separately via `set_consensus`.
+    ///
+    /// # Panics
+    /// Panics if segment storage creation fails.
     pub async fn init_partition(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
         if let Some(idx) = self.local_idx(&namespace) {
             return idx;
@@ -320,7 +326,7 @@ impl<C> IggyPartitions<C> {
 
         // Create partition with initialized log
         let stats = Arc::new(PartitionStats::default());
-        let mut partition = IggyPartition::new(stats.clone());
+        let mut partition = IggyPartition::new(stats);
         partition.log.add_persisted_segment(segment, storage);
         partition.offset.store(start_offset, Ordering::Relaxed);
         partition
@@ -367,10 +373,10 @@ where
         };
 
         let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
-        if !is_old_prepare {
-            self.replicate(message.clone()).await;
-        } else {
+        if is_old_prepare {
             warn!("received old prepare, not replicating");
+        } else {
+            self.replicate(message.clone()).await;
         }
 
         // TODO: Make those assertions be toggleable through an feature flag, 
so they can be used only by simulator/tests.
@@ -386,7 +392,7 @@ where
         self.send_prepare_ok(header).await;
 
         if consensus.is_follower() {
-            self.commit_journal(&namespace);
+            self.commit_journal(namespace);
         }
     }
 
@@ -498,7 +504,7 @@ where
                 .message_bus()
                 .send_to_client(prepare_header.client, generic_reply)
                 .await
-                .unwrap()
+                .unwrap();
         }
     }
 }
@@ -523,6 +529,8 @@ impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
+    /// # Panics
+    /// Panics if consensus is not initialized.
     pub fn register_namespace_in_pipeline(&self, ns: u64) {
         self.consensus()
             .expect("register_namespace_in_pipeline: consensus not 
initialized")
@@ -540,8 +548,7 @@ where
         let indexes_end = 4 + indexes_len;
         assert!(
             body.len() >= indexes_end,
-            "prepare body too small for {} indexes",
-            count
+            "prepare body too small for {count} indexes",
         );
 
         let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
@@ -626,7 +633,8 @@ where
         replicate_to_next_in_chain(consensus, message).await;
     }
 
-    fn commit_journal(&self, _namespace: &IggyNamespace) {
+    #[allow(clippy::unused_self, clippy::missing_const_for_fn)]
+    fn commit_journal(&self, _namespace: IggyNamespace) {
         // TODO: Implement commit logic for followers.
         // Walk through journal from last committed to current commit number
         // Apply each entry to the partition state
@@ -665,7 +673,7 @@ where
             .increment_size_bytes(journal_info.size.as_bytes_u64());
         partition
             .stats
-            .increment_messages_count(journal_info.messages_count as u64);
+            .increment_messages_count(u64::from(journal_info.messages_count));
 
         // 3. Check flush thresholds.
         let is_full = segment.is_full();
@@ -703,7 +711,7 @@ where
             let partition = self
                 .get_mut_by_ns(namespace)
                 .expect("commit_messages: partition not found");
-            partition.log.journal_mut().info = Default::default();
+            partition.log.journal_mut().info = JournalInfo::default();
         }
 
         // 4. Advance committed offset (last, so consumers only see offset 
after data is durable).
@@ -721,7 +729,10 @@ where
         namespace: &IggyNamespace,
         frozen_batches: Vec<iggy_common::IggyMessagesBatch>,
     ) {
-        let batch_count: u32 = frozen_batches.iter().map(|b| b.count()).sum();
+        let batch_count: u32 = frozen_batches
+            .iter()
+            .map(iggy_common::IggyMessagesBatch::count)
+            .sum();
 
         if batch_count == 0 {
             return;
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 1a4169daf..ab281e940 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -31,9 +31,7 @@ impl Storage for Noop {
         0
     }
 
-    async fn read(&self, _offset: usize, buffer: ()) -> () {
-        buffer
-    }
+    async fn read(&self, _offset: usize, _buffer: ()) {}
 }
 
 /// Lookup key for querying messages from the journal.
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index d92b078c4..d997d72b7 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#![allow(clippy::future_not_send)]
+
 mod iggy_partition;
 mod iggy_partitions;
 mod journal;
@@ -48,6 +50,8 @@ pub trait Partition {
         async { Err(IggyError::FeatureUnavailable) }
     }
 
+    /// # Errors
+    /// Returns `IggyError::FeatureUnavailable` by default.
     fn store_consumer_offset(
         &self,
         consumer: PollingConsumer,
diff --git a/core/partitions/src/log.rs b/core/partitions/src/log.rs
index 9a9fd11bb..7d88f60c2 100644
--- a/core/partitions/src/log.rs
+++ b/core/partitions/src/log.rs
@@ -133,23 +133,23 @@ where
     S: Storage,
     J: Debug + Journal<S>,
 {
-    pub fn has_segments(&self) -> bool {
+    pub const fn has_segments(&self) -> bool {
         !self.segments.is_empty()
     }
 
-    pub fn segments(&self) -> &Vec<Segment> {
+    pub const fn segments(&self) -> &Vec<Segment> {
         &self.segments
     }
 
-    pub fn segments_mut(&mut self) -> &mut Vec<Segment> {
+    pub const fn segments_mut(&mut self) -> &mut Vec<Segment> {
         &mut self.segments
     }
 
-    pub fn storages_mut(&mut self) -> &mut Vec<SegmentStorage> {
+    pub const fn storages_mut(&mut self) -> &mut Vec<SegmentStorage> {
         &mut self.storage
     }
 
-    pub fn storages(&self) -> &Vec<SegmentStorage> {
+    pub const fn storages(&self) -> &Vec<SegmentStorage> {
         &self.storage
     }
 
@@ -177,11 +177,11 @@ where
             .expect("active storage called on empty log")
     }
 
-    pub fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
+    pub const fn indexes(&self) -> &Vec<Option<IggyIndexesMut>> {
         &self.indexes
     }
 
-    pub fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> {
+    pub const fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> {
         &mut self.indexes
     }
 
@@ -230,11 +230,11 @@ where
         }
     }
 
-    pub fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
+    pub const fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
         &self.in_flight
     }
 
-    pub fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight {
+    pub const fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight 
{
         &mut self.in_flight
     }
 
@@ -252,11 +252,11 @@ where
     S: Storage,
     J: Debug + Journal<S>,
 {
-    pub fn journal_mut(&mut self) -> &mut JournalState<J> {
+    pub const fn journal_mut(&mut self) -> &mut JournalState<J> {
         &mut self.journal
     }
 
-    pub fn journal(&self) -> &JournalState<J> {
+    pub const fn journal(&self) -> &JournalState<J> {
         &self.journal
     }
 }
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index 886cb715e..1a21609a5 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -26,7 +26,8 @@ pub struct PollingArgs {
 }
 
 impl PollingArgs {
-    pub fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) -> 
Self {
+    #[must_use]
+    pub const fn new(strategy: PollingStrategy, count: u32, auto_commit: bool) 
-> Self {
         Self {
             strategy,
             count,
@@ -45,9 +46,9 @@ pub struct SendMessagesResult {
 // TODO(hubcio): unify with server's `PollingConsumer` in 
`streaming/polling_consumer.rs`.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum PollingConsumer {
-    /// Regular consumer with (consumer_id, partition_id)
+    /// Regular consumer with (`consumer_id`, `partition_id`)
     Consumer(usize, usize),
-    /// Consumer group with (group_id, member_id)
+    /// Consumer group with (`group_id`, `member_id`)
     ConsumerGroup(usize, usize),
 }
 
@@ -65,7 +66,8 @@ pub struct AppendResult {
 }
 
 impl AppendResult {
-    pub fn new(start_offset: u64, end_offset: u64, messages_count: u32) -> 
Self {
+    #[must_use]
+    pub const fn new(start_offset: u64, end_offset: u64, messages_count: u32) 
-> Self {
         Self {
             start_offset,
             end_offset,
@@ -75,7 +77,8 @@ impl AppendResult {
 
     /// Returns the number of offsets in the range.
     #[inline]
-    pub fn offset_count(&self) -> u64 {
+    #[must_use]
+    pub const fn offset_count(&self) -> u64 {
         self.end_offset - self.start_offset + 1
     }
 }
@@ -114,12 +117,11 @@ pub struct PartitionOffsets {
 }
 
 impl PartitionOffsets {
+    #[must_use]
     pub fn new(commit_offset: u64, write_offset: u64) -> Self {
         debug_assert!(
             write_offset >= commit_offset,
-            "write_offset ({}) must be >= commit_offset ({})",
-            write_offset,
-            commit_offset
+            "write_offset ({write_offset}) must be >= commit_offset 
({commit_offset})",
         );
         Self {
             commit_offset,
@@ -128,7 +130,8 @@ impl PartitionOffsets {
     }
 
     /// Create offsets for an empty partition.
-    pub fn empty() -> Self {
+    #[must_use]
+    pub const fn empty() -> Self {
         Self {
             commit_offset: 0,
             write_offset: 0,
@@ -136,17 +139,20 @@ impl PartitionOffsets {
     }
 
     /// Returns true if there are uncommitted (prepared) messages.
-    pub fn has_uncommitted(&self) -> bool {
+    #[must_use]
+    pub const fn has_uncommitted(&self) -> bool {
         self.write_offset > self.commit_offset
     }
 
     /// Returns the number of uncommitted messages.
-    pub fn uncommitted_count(&self) -> u64 {
+    #[must_use]
+    pub const fn uncommitted_count(&self) -> u64 {
         self.write_offset - self.commit_offset
     }
 
     /// Returns true if commit and write offsets are equal.
-    pub fn is_fully_committed(&self) -> bool {
+    #[must_use]
+    pub const fn is_fully_committed(&self) -> bool {
         self.write_offset == self.commit_offset
     }
 }
@@ -179,6 +185,7 @@ impl PartitionsConfig {
     /// TODO: This is a stub waiting for completion of issue to move server 
config
     /// to shared module. Real implementation should use:
     /// 
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.log`
+    #[must_use]
     pub fn get_messages_path(
         &self,
         stream_id: usize,
@@ -187,8 +194,7 @@ impl PartitionsConfig {
         start_offset: u64,
     ) -> String {
         format!(
-            "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.log",
-            stream_id, topic_id, partition_id, start_offset
+            
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.log",
         )
     }
 
@@ -197,6 +203,7 @@ impl PartitionsConfig {
     /// TODO: This is a stub waiting for completion of issue to move server 
config
     /// to shared module. Real implementation should use:
     /// 
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.index`
+    #[must_use]
     pub fn get_index_path(
         &self,
         stream_id: usize,
@@ -205,8 +212,7 @@ impl PartitionsConfig {
         start_offset: u64,
     ) -> String {
         format!(
-            "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.index",
-            stream_id, topic_id, partition_id, start_offset
+            
"/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.index",
         )
     }
 }

Reply via email to