This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f3fef3dda feat(shard): create `shard` crate and generic `IggyShard` 
(#2811)
f3fef3dda is described below

commit f3fef3ddab97bd8a6b166a5041833956a12e6752
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Feb 26 19:42:24 2026 +0530

    feat(shard): create `shard` crate and generic `IggyShard` (#2811)
    
    - Created `shard` crate and generic `IggyShard`
    - `IggyShard::dispatch()` routes incoming network messages to the
    correct consensus plane via `PlaneIdentity`
---
 .github/workflows/_common.yml        |   1 +
 Cargo.lock                           |  13 +++
 Cargo.toml                           |   1 +
 DEPENDENCIES.md                      |   1 +
 core/{simulator => shard}/Cargo.toml |   5 +-
 core/shard/src/lib.rs                | 152 +++++++++++++++++++++++++++++++++++
 core/simulator/Cargo.toml            |   1 +
 core/simulator/src/deps.rs           |  19 +----
 core/simulator/src/lib.rs            |  42 +++-------
 core/simulator/src/replica.rs        | 119 ++++++++++++---------------
 10 files changed, 234 insertions(+), 120 deletions(-)

diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index ddd18e9e8..fb746ba4f 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -100,6 +100,7 @@ jobs:
             sdk
             security
             server
+            shard
             test
             web
             consensus
diff --git a/Cargo.lock b/Cargo.lock
index 47adb0aa1..76d8605cf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8427,6 +8427,18 @@ dependencies = [
  "keccak",
 ]
 
+[[package]]
+name = "shard"
+version = "0.1.0"
+dependencies = [
+ "consensus",
+ "iggy_common",
+ "journal",
+ "message_bus",
+ "metadata",
+ "partitions",
+]
+
 [[package]]
 name = "sharded-slab"
 version = "0.1.7"
@@ -8513,6 +8525,7 @@ dependencies = [
  "message_bus",
  "metadata",
  "partitions",
+ "shard",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 55794a308..d3d43c566 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ members = [
     "core/partitions",
     "core/sdk",
     "core/server",
+    "core/shard",
     "core/simulator",
     "core/tools",
     "examples/rust",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 4c555b8c7..ffe0d501e 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -728,6 +728,7 @@ server: 0.7.0, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
+shard: 0.1.0, "N/A",
 sharded-slab: 0.1.7, "MIT",
 shlex: 1.3.0, "Apache-2.0 OR MIT",
 signal-hook-registry: 1.4.8, "Apache-2.0 OR MIT",
diff --git a/core/simulator/Cargo.toml b/core/shard/Cargo.toml
similarity index 90%
copy from core/simulator/Cargo.toml
copy to core/shard/Cargo.toml
index b8e25866d..c5b7621fd 100644
--- a/core/simulator/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -16,15 +16,12 @@
 # under the License.
 
 [package]
-name = "simulator"
+name = "shard"
 version = "0.1.0"
 edition = "2024"
 
 [dependencies]
-bytemuck = { workspace = true }
-bytes = { workspace = true }
 consensus = { path = "../consensus" }
-futures = { workspace = true }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
new file mode 100644
index 000000000..0b41b1d84
--- /dev/null
+++ b/core/shard/src/lib.rs
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use consensus::{MuxPlane, NamespacedPipeline, Plane, PlaneIdentity, 
VsrConsensus};
+use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader};
+use iggy_common::message::{Message, MessageBag};
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::variadic;
+use journal::{Journal, JournalHandle};
+use message_bus::MessageBus;
+use metadata::IggyMetadata;
+use metadata::stm::StateMachine;
+use partitions::IggyPartitions;
+
+// variadic!(Metadata, Partitions) = (Metadata, (Partitions, ()))
+type PlaneInner<B, J, S, M> = (
+    IggyMetadata<VsrConsensus<B>, J, S, M>,
+    (IggyPartitions<VsrConsensus<B, NamespacedPipeline>>, ()),
+);
+
+pub type ShardPlane<B, J, S, M> = MuxPlane<PlaneInner<B, J, S, M>>;
+
+pub struct IggyShard<B, J, S, M>
+where
+    B: MessageBus,
+{
+    pub id: u8,
+    pub name: String,
+    pub plane: ShardPlane<B, J, S, M>,
+}
+
+impl<B, J, S, M> IggyShard<B, J, S, M>
+where
+    B: MessageBus,
+{
+    /// Create a new shard from pre-built metadata and partition planes.
+    pub fn new(
+        id: u8,
+        name: String,
+        metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
+        partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+    ) -> Self {
+        let plane = MuxPlane::new(variadic!(metadata, partitions));
+        Self { id, name, plane }
+    }
+
+    /// Dispatch an incoming network message to the appropriate consensus 
plane.
+    ///
+    /// Routes requests, replication messages, and acks to either the metadata
+    /// plane or the partitions plane based on `PlaneIdentity::is_applicable`.
+    pub async fn on_message(&self, message: Message<GenericHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        match MessageBag::from(message) {
+            MessageBag::Request(request) => self.on_request(request).await,
+            MessageBag::Prepare(prepare) => self.on_replicate(prepare).await,
+            MessageBag::PrepareOk(prepare_ok) => self.on_ack(prepare_ok).await,
+        }
+    }
+
+    pub async fn on_request(&self, request: Message<RequestHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        let planes = self.plane.inner();
+        if planes.0.is_applicable(&request) {
+            planes.0.on_request(request).await;
+        } else {
+            planes.1.0.on_request(request).await;
+        }
+    }
+
+    pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        let planes = self.plane.inner();
+        if planes.0.is_applicable(&prepare) {
+            planes.0.on_replicate(prepare).await;
+        } else {
+            planes.1.0.on_replicate(prepare).await;
+        }
+    }
+
+    pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        let planes = self.plane.inner();
+        if planes.0.is_applicable(&prepare_ok) {
+            planes.0.on_ack(prepare_ok).await;
+        } else {
+            planes.1.0.on_ack(prepare_ok).await;
+        }
+    }
+
+    pub fn init_partition(&mut self, namespace: IggyNamespace)
+    where
+        B: MessageBus<
+                Replica = u8,
+                Data = 
iggy_common::message::Message<iggy_common::header::GenericHeader>,
+                Client = u128,
+            >,
+    {
+        let partitions = &mut self.plane.inner_mut().1.0;
+        partitions.init_partition_in_memory(namespace);
+        partitions.register_namespace_in_pipeline(namespace.inner());
+    }
+}
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index b8e25866d..fad276777 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -30,3 +30,4 @@ journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 metadata = { path = "../metadata" }
 partitions = { path = "../partitions" }
+shard = { path = "../shard" }
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 9630c3aa3..622704820 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -15,19 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::bus::SharedMemBus;
 use bytes::Bytes;
-use consensus::{
-    MuxPlane, {NamespacedPipeline, VsrConsensus},
-};
 use iggy_common::header::PrepareHeader;
 use iggy_common::message::Message;
 use iggy_common::variadic;
 use journal::{Journal, JournalHandle, Storage};
+use metadata::MuxStateMachine;
 use metadata::stm::consumer_group::ConsumerGroups;
 use metadata::stm::stream::Streams;
 use metadata::stm::user::Users;
-use metadata::{IggyMetadata, MuxStateMachine};
 use std::cell::{Cell, RefCell, UnsafeCell};
 use std::collections::HashMap;
 
@@ -151,16 +147,5 @@ impl JournalHandle for SimJournal<MemStorage> {
 #[derive(Debug, Default)]
 pub struct SimSnapshot {}
 
-/// Type aliases for simulator metadata
+/// Type alias for simulator state machine
 pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)>;
-pub type SimMetadata = IggyMetadata<
-    VsrConsensus<SharedMemBus>,
-    SimJournal<MemStorage>,
-    SimSnapshot,
-    SimMuxStateMachine,
->;
-
-/// Type alias for simulator partitions
-pub type ReplicaPartitions =
-    partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
-pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 13858b62c..0d0b55bb3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,11 +21,10 @@ pub mod deps;
 pub mod replica;
 
 use bus::MemBus;
-use consensus::{Plane, PlaneIdentity};
-use iggy_common::header::{GenericHeader, ReplyHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_common::header::ReplyHeader;
+use iggy_common::message::Message;
 use message_bus::MessageBus;
-use replica::Replica;
+use replica::{Replica, new_replica};
 use std::sync::Arc;
 
 pub struct Simulator {
@@ -54,7 +53,7 @@ impl Simulator {
         let message_bus = Arc::new(message_bus);
         let replicas = (0..replica_count)
             .map(|i| {
-                Replica::new(
+                new_replica(
                     i as u8,
                     format!("replica-{}", i),
                     Arc::clone(&message_bus),
@@ -77,7 +76,7 @@ impl Simulator {
         let message_bus = Arc::new(message_bus);
         let replicas = (0..replica_count)
             .map(|i| {
-                Replica::new(
+                new_replica(
                     i as u8,
                     format!("replica-{}", i),
                     Arc::clone(&message_bus),
@@ -114,31 +113,12 @@ impl Simulator {
         None
     }
 
-    async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
-        let planes = replica.plane.inner();
-        match MessageBag::from(message) {
-            MessageBag::Request(request) => {
-                if planes.0.is_applicable(&request) {
-                    planes.0.on_request(request).await;
-                } else {
-                    planes.1.0.on_request(request).await;
-                }
-            }
-            MessageBag::Prepare(prepare) => {
-                if planes.0.is_applicable(&prepare) {
-                    planes.0.on_replicate(prepare).await;
-                } else {
-                    planes.1.0.on_replicate(prepare).await;
-                }
-            }
-            MessageBag::PrepareOk(prepare_ok) => {
-                if planes.0.is_applicable(&prepare_ok) {
-                    planes.0.on_ack(prepare_ok).await;
-                } else {
-                    planes.1.0.on_ack(prepare_ok).await;
-                }
-            }
-        }
+    async fn dispatch_to_replica(
+        &self,
+        replica: &Replica,
+        message: Message<iggy_common::header::GenericHeader>,
+    ) {
+        replica.on_message(message).await;
     }
 }
 
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index b7728f622..7b3d9ee7f 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -16,88 +16,71 @@
 // under the License.
 
 use crate::bus::{MemBus, SharedMemBus};
-use crate::deps::{
-    ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimPlane, 
SimSnapshot,
-};
+use crate::deps::{MemStorage, SimJournal, SimMuxStateMachine, SimSnapshot};
 use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
-use iggy_common::sharding::{IggyNamespace, ShardId};
-use iggy_common::{IggyByteSize, variadic};
+use iggy_common::IggyByteSize;
+use iggy_common::sharding::ShardId;
+use iggy_common::variadic;
+use metadata::IggyMetadata;
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
-use partitions::PartitionsConfig;
+use partitions::{IggyPartitions, PartitionsConfig};
 use std::sync::Arc;
 
 // TODO: Make configurable
 const CLUSTER_ID: u128 = 1;
 
-pub struct Replica {
-    pub id: u8,
-    pub name: String,
-    pub replica_count: u8,
-    pub plane: SimPlane,
-    pub bus: Arc<MemBus>,
-}
+// For now there is only one shard per replica,
+// we will add support for multiple shards per replica in the future.
+pub type Replica =
+    shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot, 
SimMuxStateMachine>;
 
-impl Replica {
-    pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) -> 
Self {
-        let users: Users = UsersInner::new().into();
-        let streams: Streams = StreamsInner::new().into();
-        let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
-        let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
+pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) 
-> Replica {
+    let users: Users = UsersInner::new().into();
+    let streams: Streams = StreamsInner::new().into();
+    let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
+    let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
 
-        // Metadata uses namespace=0 (not partition-scoped)
-        let metadata_consensus = VsrConsensus::new(
-            CLUSTER_ID,
-            id,
-            replica_count,
-            0,
-            SharedMemBus(Arc::clone(&bus)),
-            LocalPipeline::new(),
-        );
-        metadata_consensus.init();
-        let metadata = SimMetadata {
-            consensus: Some(metadata_consensus),
-            journal: Some(SimJournal::default()),
-            snapshot: Some(SimSnapshot::default()),
-            mux_stm: mux,
-        };
+    // Metadata uses namespace=0 (not partition-scoped)
+    let metadata_consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        id,
+        replica_count,
+        0,
+        SharedMemBus(Arc::clone(&bus)),
+        LocalPipeline::new(),
+    );
+    metadata_consensus.init();
 
-        let partitions_config = PartitionsConfig {
-            messages_required_to_save: 1000,
-            size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 
1024),
-            enforce_fsync: false, // Disable fsync for simulation
-            segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GiB 
segments
-        };
+    let metadata = IggyMetadata {
+        consensus: Some(metadata_consensus),
+        journal: Some(SimJournal::<MemStorage>::default()),
+        snapshot: Some(SimSnapshot::default()),
+        mux_stm: mux,
+    };
 
-        let mut partitions = ReplicaPartitions::new(ShardId::new(id as u16), 
partitions_config);
+    let partitions_config = PartitionsConfig {
+        messages_required_to_save: 1000,
+        size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 1024),
+        enforce_fsync: false, //Disable fsync for simulation
+        segment_size: IggyByteSize::from(1024 * 1024 * 1024),
+    };
 
-        // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
-        // routes by Operation type, but a shared view change bus would 
produce namespace collisions.
-        let partition_consensus = VsrConsensus::new(
-            CLUSTER_ID,
-            id,
-            replica_count,
-            0,
-            SharedMemBus(Arc::clone(&bus)),
-            NamespacedPipeline::new(),
-        );
-        partition_consensus.init();
-        partitions.set_consensus(partition_consensus);
-        let plane = SimPlane::new(variadic!(metadata, partitions));
+    let mut partitions = IggyPartitions::new(ShardId::new(id as u16), 
partitions_config);
 
-        Self {
-            id,
-            name,
-            plane,
-            replica_count,
-            bus,
-        }
-    }
+    // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
+    // routes by Operation type, but a shared view change bus would produce 
namespace collisions.
+    let partition_consensus = VsrConsensus::new(
+        CLUSTER_ID,
+        id,
+        replica_count,
+        0,
+        SharedMemBus(Arc::clone(&bus)),
+        NamespacedPipeline::new(),
+    );
+    partition_consensus.init();
+    partitions.set_consensus(partition_consensus);
 
-    pub fn init_partition(&mut self, namespace: IggyNamespace) {
-        let partitions = &mut self.plane.inner_mut().1.0;
-        partitions.init_partition_in_memory(namespace);
-        partitions.register_namespace_in_pipeline(namespace.inner());
-    }
+    shard::IggyShard::new(id, name, metadata, partitions)
 }

Reply via email to