This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f3fef3dda feat(shard): create `shard` crate and generic `IggyShard`
(#2811)
f3fef3dda is described below
commit f3fef3ddab97bd8a6b166a5041833956a12e6752
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Feb 26 19:42:24 2026 +0530
feat(shard): create `shard` crate and generic `IggyShard` (#2811)
- Created `shard` crate and generic `IggyShard`
- `IggyShard::dispatch()` routes incoming network messages to the
correct consensus plane via `PlaneIdentity`
---
.github/workflows/_common.yml | 1 +
Cargo.lock | 13 +++
Cargo.toml | 1 +
DEPENDENCIES.md | 1 +
core/{simulator => shard}/Cargo.toml | 5 +-
core/shard/src/lib.rs | 152 +++++++++++++++++++++++++++++++++++
core/simulator/Cargo.toml | 1 +
core/simulator/src/deps.rs | 19 +----
core/simulator/src/lib.rs | 42 +++-------
core/simulator/src/replica.rs | 119 ++++++++++++---------------
10 files changed, 234 insertions(+), 120 deletions(-)
diff --git a/.github/workflows/_common.yml b/.github/workflows/_common.yml
index ddd18e9e8..fb746ba4f 100644
--- a/.github/workflows/_common.yml
+++ b/.github/workflows/_common.yml
@@ -100,6 +100,7 @@ jobs:
sdk
security
server
+ shard
test
web
consensus
diff --git a/Cargo.lock b/Cargo.lock
index 47adb0aa1..76d8605cf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8427,6 +8427,18 @@ dependencies = [
"keccak",
]
+[[package]]
+name = "shard"
+version = "0.1.0"
+dependencies = [
+ "consensus",
+ "iggy_common",
+ "journal",
+ "message_bus",
+ "metadata",
+ "partitions",
+]
+
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -8513,6 +8525,7 @@ dependencies = [
"message_bus",
"metadata",
"partitions",
+ "shard",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 55794a308..d3d43c566 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -50,6 +50,7 @@ members = [
"core/partitions",
"core/sdk",
"core/server",
+ "core/shard",
"core/simulator",
"core/tools",
"examples/rust",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 4c555b8c7..ffe0d501e 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -728,6 +728,7 @@ server: 0.7.0, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
+shard: 0.1.0, "N/A",
sharded-slab: 0.1.7, "MIT",
shlex: 1.3.0, "Apache-2.0 OR MIT",
signal-hook-registry: 1.4.8, "Apache-2.0 OR MIT",
diff --git a/core/simulator/Cargo.toml b/core/shard/Cargo.toml
similarity index 90%
copy from core/simulator/Cargo.toml
copy to core/shard/Cargo.toml
index b8e25866d..c5b7621fd 100644
--- a/core/simulator/Cargo.toml
+++ b/core/shard/Cargo.toml
@@ -16,15 +16,12 @@
# under the License.
[package]
-name = "simulator"
+name = "shard"
version = "0.1.0"
edition = "2024"
[dependencies]
-bytemuck = { workspace = true }
-bytes = { workspace = true }
consensus = { path = "../consensus" }
-futures = { workspace = true }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
new file mode 100644
index 000000000..0b41b1d84
--- /dev/null
+++ b/core/shard/src/lib.rs
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use consensus::{MuxPlane, NamespacedPipeline, Plane, PlaneIdentity,
VsrConsensus};
+use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader};
+use iggy_common::message::{Message, MessageBag};
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::variadic;
+use journal::{Journal, JournalHandle};
+use message_bus::MessageBus;
+use metadata::IggyMetadata;
+use metadata::stm::StateMachine;
+use partitions::IggyPartitions;
+
+// variadic!(Metadata, Partitions) = (Metadata, (Partitions, ()))
+type PlaneInner<B, J, S, M> = (
+ IggyMetadata<VsrConsensus<B>, J, S, M>,
+ (IggyPartitions<VsrConsensus<B, NamespacedPipeline>>, ()),
+);
+
+pub type ShardPlane<B, J, S, M> = MuxPlane<PlaneInner<B, J, S, M>>;
+
+pub struct IggyShard<B, J, S, M>
+where
+ B: MessageBus,
+{
+ pub id: u8,
+ pub name: String,
+ pub plane: ShardPlane<B, J, S, M>,
+}
+
+impl<B, J, S, M> IggyShard<B, J, S, M>
+where
+ B: MessageBus,
+{
+ /// Create a new shard from pre-built metadata and partition planes.
+ pub fn new(
+ id: u8,
+ name: String,
+ metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
+ partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
+ ) -> Self {
+ let plane = MuxPlane::new(variadic!(metadata, partitions));
+ Self { id, name, plane }
+ }
+
+ /// Dispatch an incoming network message to the appropriate consensus
plane.
+ ///
+ /// Routes requests, replication messages, and acks to either the metadata
+ /// plane or the partitions plane based on `PlaneIdentity::is_applicable`.
+ pub async fn on_message(&self, message: Message<GenericHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ match MessageBag::from(message) {
+ MessageBag::Request(request) => self.on_request(request).await,
+ MessageBag::Prepare(prepare) => self.on_replicate(prepare).await,
+ MessageBag::PrepareOk(prepare_ok) => self.on_ack(prepare_ok).await,
+ }
+ }
+
+ pub async fn on_request(&self, request: Message<RequestHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ let planes = self.plane.inner();
+ if planes.0.is_applicable(&request) {
+ planes.0.on_request(request).await;
+ } else {
+ planes.1.0.on_request(request).await;
+ }
+ }
+
+ pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ let planes = self.plane.inner();
+ if planes.0.is_applicable(&prepare) {
+ planes.0.on_replicate(prepare).await;
+ } else {
+ planes.1.0.on_replicate(prepare).await;
+ }
+ }
+
+ pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ let planes = self.plane.inner();
+ if planes.0.is_applicable(&prepare_ok) {
+ planes.0.on_ack(prepare_ok).await;
+ } else {
+ planes.1.0.on_ack(prepare_ok).await;
+ }
+ }
+
+ pub fn init_partition(&mut self, namespace: IggyNamespace)
+ where
+ B: MessageBus<
+ Replica = u8,
+ Data =
iggy_common::message::Message<iggy_common::header::GenericHeader>,
+ Client = u128,
+ >,
+ {
+ let partitions = &mut self.plane.inner_mut().1.0;
+ partitions.init_partition_in_memory(namespace);
+ partitions.register_namespace_in_pipeline(namespace.inner());
+ }
+}
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index b8e25866d..fad276777 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -30,3 +30,4 @@ journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
metadata = { path = "../metadata" }
partitions = { path = "../partitions" }
+shard = { path = "../shard" }
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 9630c3aa3..622704820 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -15,19 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use crate::bus::SharedMemBus;
use bytes::Bytes;
-use consensus::{
- MuxPlane, {NamespacedPipeline, VsrConsensus},
-};
use iggy_common::header::PrepareHeader;
use iggy_common::message::Message;
use iggy_common::variadic;
use journal::{Journal, JournalHandle, Storage};
+use metadata::MuxStateMachine;
use metadata::stm::consumer_group::ConsumerGroups;
use metadata::stm::stream::Streams;
use metadata::stm::user::Users;
-use metadata::{IggyMetadata, MuxStateMachine};
use std::cell::{Cell, RefCell, UnsafeCell};
use std::collections::HashMap;
@@ -151,16 +147,5 @@ impl JournalHandle for SimJournal<MemStorage> {
#[derive(Debug, Default)]
pub struct SimSnapshot {}
-/// Type aliases for simulator metadata
+/// Type alias for simulator state machine
pub type SimMuxStateMachine = MuxStateMachine<variadic!(Users, Streams,
ConsumerGroups)>;
-pub type SimMetadata = IggyMetadata<
- VsrConsensus<SharedMemBus>,
- SimJournal<MemStorage>,
- SimSnapshot,
- SimMuxStateMachine,
->;
-
-/// Type alias for simulator partitions
-pub type ReplicaPartitions =
- partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
-pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 13858b62c..0d0b55bb3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,11 +21,10 @@ pub mod deps;
pub mod replica;
use bus::MemBus;
-use consensus::{Plane, PlaneIdentity};
-use iggy_common::header::{GenericHeader, ReplyHeader};
-use iggy_common::message::{Message, MessageBag};
+use iggy_common::header::ReplyHeader;
+use iggy_common::message::Message;
use message_bus::MessageBus;
-use replica::Replica;
+use replica::{Replica, new_replica};
use std::sync::Arc;
pub struct Simulator {
@@ -54,7 +53,7 @@ impl Simulator {
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
- Replica::new(
+ new_replica(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
@@ -77,7 +76,7 @@ impl Simulator {
let message_bus = Arc::new(message_bus);
let replicas = (0..replica_count)
.map(|i| {
- Replica::new(
+ new_replica(
i as u8,
format!("replica-{}", i),
Arc::clone(&message_bus),
@@ -114,31 +113,12 @@ impl Simulator {
None
}
- async fn dispatch_to_replica(&self, replica: &Replica, message:
Message<GenericHeader>) {
- let planes = replica.plane.inner();
- match MessageBag::from(message) {
- MessageBag::Request(request) => {
- if planes.0.is_applicable(&request) {
- planes.0.on_request(request).await;
- } else {
- planes.1.0.on_request(request).await;
- }
- }
- MessageBag::Prepare(prepare) => {
- if planes.0.is_applicable(&prepare) {
- planes.0.on_replicate(prepare).await;
- } else {
- planes.1.0.on_replicate(prepare).await;
- }
- }
- MessageBag::PrepareOk(prepare_ok) => {
- if planes.0.is_applicable(&prepare_ok) {
- planes.0.on_ack(prepare_ok).await;
- } else {
- planes.1.0.on_ack(prepare_ok).await;
- }
- }
- }
+ async fn dispatch_to_replica(
+ &self,
+ replica: &Replica,
+ message: Message<iggy_common::header::GenericHeader>,
+ ) {
+ replica.on_message(message).await;
}
}
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index b7728f622..7b3d9ee7f 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -16,88 +16,71 @@
// under the License.
use crate::bus::{MemBus, SharedMemBus};
-use crate::deps::{
- ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimPlane,
SimSnapshot,
-};
+use crate::deps::{MemStorage, SimJournal, SimMuxStateMachine, SimSnapshot};
use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
-use iggy_common::sharding::{IggyNamespace, ShardId};
-use iggy_common::{IggyByteSize, variadic};
+use iggy_common::IggyByteSize;
+use iggy_common::sharding::ShardId;
+use iggy_common::variadic;
+use metadata::IggyMetadata;
use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
use metadata::stm::stream::{Streams, StreamsInner};
use metadata::stm::user::{Users, UsersInner};
-use partitions::PartitionsConfig;
+use partitions::{IggyPartitions, PartitionsConfig};
use std::sync::Arc;
// TODO: Make configurable
const CLUSTER_ID: u128 = 1;
-pub struct Replica {
- pub id: u8,
- pub name: String,
- pub replica_count: u8,
- pub plane: SimPlane,
- pub bus: Arc<MemBus>,
-}
+// For now there is only one shard per replica,
+// we will add support for multiple shards per replica in the future.
+pub type Replica =
+ shard::IggyShard<SharedMemBus, SimJournal<MemStorage>, SimSnapshot,
SimMuxStateMachine>;
-impl Replica {
- pub fn new(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8) ->
Self {
- let users: Users = UsersInner::new().into();
- let streams: Streams = StreamsInner::new().into();
- let consumer_groups: ConsumerGroups =
ConsumerGroupsInner::new().into();
- let mux = SimMuxStateMachine::new(variadic!(users, streams,
consumer_groups));
+pub fn new_replica(id: u8, name: String, bus: Arc<MemBus>, replica_count: u8)
-> Replica {
+ let users: Users = UsersInner::new().into();
+ let streams: Streams = StreamsInner::new().into();
+ let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into();
+ let mux = SimMuxStateMachine::new(variadic!(users, streams,
consumer_groups));
- // Metadata uses namespace=0 (not partition-scoped)
- let metadata_consensus = VsrConsensus::new(
- CLUSTER_ID,
- id,
- replica_count,
- 0,
- SharedMemBus(Arc::clone(&bus)),
- LocalPipeline::new(),
- );
- metadata_consensus.init();
- let metadata = SimMetadata {
- consensus: Some(metadata_consensus),
- journal: Some(SimJournal::default()),
- snapshot: Some(SimSnapshot::default()),
- mux_stm: mux,
- };
+ // Metadata uses namespace=0 (not partition-scoped)
+ let metadata_consensus = VsrConsensus::new(
+ CLUSTER_ID,
+ id,
+ replica_count,
+ 0,
+ SharedMemBus(Arc::clone(&bus)),
+ LocalPipeline::new(),
+ );
+ metadata_consensus.init();
- let partitions_config = PartitionsConfig {
- messages_required_to_save: 1000,
- size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 *
1024),
- enforce_fsync: false, // Disable fsync for simulation
- segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GiB
segments
- };
+ let metadata = IggyMetadata {
+ consensus: Some(metadata_consensus),
+ journal: Some(SimJournal::<MemStorage>::default()),
+ snapshot: Some(SimSnapshot::default()),
+ mux_stm: mux,
+ };
- let mut partitions = ReplicaPartitions::new(ShardId::new(id as u16),
partitions_config);
+ let partitions_config = PartitionsConfig {
+ messages_required_to_save: 1000,
+ size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 1024),
+ enforce_fsync: false, //Disable fsync for simulation
+ segment_size: IggyByteSize::from(1024 * 1024 * 1024),
+ };
- // TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
- // routes by Operation type, but a shared view change bus would
produce namespace collisions.
- let partition_consensus = VsrConsensus::new(
- CLUSTER_ID,
- id,
- replica_count,
- 0,
- SharedMemBus(Arc::clone(&bus)),
- NamespacedPipeline::new(),
- );
- partition_consensus.init();
- partitions.set_consensus(partition_consensus);
- let plane = SimPlane::new(variadic!(metadata, partitions));
+ let mut partitions = IggyPartitions::new(ShardId::new(id as u16),
partitions_config);
- Self {
- id,
- name,
- plane,
- replica_count,
- bus,
- }
- }
+ // TODO: namespace=0 collides with metadata consensus. Safe for now
because the simulator
+ // routes by Operation type, but a shared view change bus would produce
namespace collisions.
+ let partition_consensus = VsrConsensus::new(
+ CLUSTER_ID,
+ id,
+ replica_count,
+ 0,
+ SharedMemBus(Arc::clone(&bus)),
+ NamespacedPipeline::new(),
+ );
+ partition_consensus.init();
+ partitions.set_consensus(partition_consensus);
- pub fn init_partition(&mut self, namespace: IggyNamespace) {
- let partitions = &mut self.plane.inner_mut().1.0;
- partitions.init_partition_in_memory(namespace);
- partitions.register_namespace_in_pipeline(namespace.inner());
- }
+ shard::IggyShard::new(id, name, metadata, partitions)
}