This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 3432714ab feat(simulator): implement PacketSimulator for deterministic 
simulator (#2769)
3432714ab is described below

commit 3432714ab5d5e477e0aad05fee0c104c3514baca
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Feb 27 14:47:39 2026 +0530

    feat(simulator): implement PacketSimulator for deterministic simulator 
(#2769)
    
    Summary:
    - Adds a `PacketSimulator` that models a full network fault-injection
    layer: per-path latency (exponential distribution), packet loss, link
    capacity limits with random eviction, automatic network partitioning
    with configurable lifecycle, and per-path clogging.
    - Adds a `ReadyQueue` — a min-heap priority queue with reservoir-sampled
    random ready removal, used to order packets by delivery tick and
    uniformly select among simultaneously ready items.
    - Partition modes: `UniformSize`, `UniformPartition`, `IsolateSingle`,
    all guarantee at least one node in each partition. Supports symmetric
    and asymmetric partitions.
    
    Note:
    - Pull-based batch delivery: `step()` returns all ready packets for the
    current tick in a single Vec. Packets delivered in the same tick cannot
    trigger chain reactions within
    that tick. This was done because its difficult to do self-referential
    structs in Rust and even with using dynamic dispatch we have to fight
    the borrow checker since we would need mutable access to different
    fields at the same time.
---
 Cargo.lock                                |  26 +
 Cargo.toml                                |   1 +
 DEPENDENCIES.md                           |   2 +
 core/common/Cargo.toml                    |   1 +
 core/common/src/types/consensus/header.rs |   3 +-
 core/simulator/Cargo.toml                 |   4 +
 core/simulator/src/lib.rs                 |   3 +
 core/simulator/src/network.rs             | 150 +++++
 core/simulator/src/packet.rs              | 982 ++++++++++++++++++++++++++++++
 core/simulator/src/ready_queue.rs         | 261 ++++++++
 10 files changed, 1432 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 124dfe960..bdbedc8d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2979,6 +2979,27 @@ dependencies = [
  "syn 2.0.115",
 ]
 
+[[package]]
+name = "enumset"
+version = "1.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "25b07a8dfbbbfc0064c0a6bdf9edcf966de6b1c33ce344bdeca3b41615452634"
+dependencies = [
+ "enumset_derive",
+]
+
+[[package]]
+name = "enumset_derive"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f43e744e4ea338060faee68ed933e46e722fb7f3617e722a5772d7e856d8b3ce"
+dependencies = [
+ "darling 0.21.3",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.115",
+]
+
 [[package]]
 name = "equivalent"
 version = "1.0.2"
@@ -4700,6 +4721,7 @@ dependencies = [
  "compio",
  "crossbeam",
  "derive_more",
+ "enumset",
  "err_trail",
  "human-repr",
  "humantime",
@@ -8522,13 +8544,17 @@ dependencies = [
  "bytemuck",
  "bytes",
  "consensus",
+ "enumset",
  "futures",
  "iggy_common",
  "journal",
  "message_bus",
  "metadata",
  "partitions",
+ "rand 0.10.0",
+ "rand_xoshiro",
  "shard",
+ "tracing",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index d3d43c566..b09e40c39 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -143,6 +143,7 @@ dlopen2 = "0.8.2"
 dotenvy = "0.15.7"
 elasticsearch = { version = "9.1.0-alpha.1", features = ["rustls-tls"], 
default-features = false }
 enum_dispatch = "0.3.13"
+enumset = "1.1"
 env_logger = "0.11.9"
 err_trail = { version = "0.11.0", features = ["tracing"] }
 error_set = "0.9.1"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 52bb36d63..2f3a6e0b9 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -257,6 +257,8 @@ embedded-io: 0.6.1, "Apache-2.0 OR MIT",
 encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
 encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
 enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
+enumset: 1.1.10, "Apache-2.0 OR MIT",
+enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
 equivalent: 1.0.2, "Apache-2.0 OR MIT",
 err_trail: 0.11.0, "Apache-2.0",
 errno: 0.3.14, "Apache-2.0 OR MIT",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index ecc75c967..aab4f8746 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -42,6 +42,7 @@ comfy-table = { workspace = true }
 compio = { workspace = true }
 crossbeam = { workspace = true }
 derive_more = { workspace = true }
+enumset = { workspace = true }
 err_trail = { workspace = true }
 human-repr = { workspace = true }
 humantime = { workspace = true }
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index 2f1a83d62..d8a005700 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use bytemuck::{Pod, Zeroable};
+use enumset::EnumSetType;
 use thiserror::Error;
 
 const HEADER_SIZE: usize = 256;
@@ -28,7 +29,7 @@ pub trait ConsensusHeader: Sized + Pod + Zeroable {
     fn size(&self) -> u32;
 }
 
-#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Default, Debug, EnumSetType)]
 #[repr(u8)]
 pub enum Command2 {
     #[default]
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index fad276777..1051c77b5 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -24,10 +24,14 @@ edition = "2024"
 bytemuck = { workspace = true }
 bytes = { workspace = true }
 consensus = { path = "../consensus" }
+enumset = { workspace = true }
 futures = { workspace = true }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 metadata = { path = "../metadata" }
 partitions = { path = "../partitions" }
+rand = { workspace = true }
+rand_xoshiro = { workspace = true }
 shard = { path = "../shard" }
+tracing = { workspace = true }
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 0d0b55bb3..de27135f3 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -18,6 +18,9 @@
 pub mod bus;
 pub mod client;
 pub mod deps;
+pub mod network;
+pub mod packet;
+pub mod ready_queue;
 pub mod replica;
 
 use bus::MemBus;
diff --git a/core/simulator/src/network.rs b/core/simulator/src/network.rs
new file mode 100644
index 000000000..91cbed788
--- /dev/null
+++ b/core/simulator/src/network.rs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Network abstraction layer for the cluster simulator.
+//!
+//! **Note:** Currently a thin passthrough over `PacketSimulator`. Once the
+//! Cluster and MessageBus layers are built, this will own
+//! process-to-bus routing, and node enable/disable logic.
+
+use crate::packet::{
+    ALLOW_ALL, BLOCK_ALL, LinkFilter, Packet, PacketSimulator, 
PacketSimulatorOptions, ProcessId,
+};
+use iggy_common::{header::GenericHeader, message::Message};
+
+/// Network layer for the cluster simulation.
+///
+/// This provides an interface over the `PacketSimulator` for the
+/// `Cluster` orchestrator to use. It handles:
+/// - Submitting packets into the network
+/// - Stepping the network to deliver ready packets
+/// - Managing network partitions and link states
+#[derive(Debug)]
+pub struct Network {
+    simulator: PacketSimulator,
+}
+
+impl Network {
+    /// Create a new network.
+    pub fn new(options: PacketSimulatorOptions) -> Self {
+        Self {
+            simulator: PacketSimulator::new(options),
+        }
+    }
+
+    /// Submit a message into the network.
+    ///
+    /// The message will be queued with a simulated delay and may be:
+    /// - Delivered normally after the delay
+    /// - Dropped (based on packet_loss_probability)
+    /// - Replayed/duplicated (based on replay_probability)
+    pub fn submit(&mut self, from: ProcessId, to: ProcessId, message: 
Message<GenericHeader>) {
+        self.simulator.submit(from, to, message);
+    }
+
+    /// Deliver all ready packets.
+    ///
+    /// The returned `Vec` is taken from an internal buffer. Pass it back via
+    /// [`recycle_buffer`](Self::recycle_buffer) after processing to reuse the
+    /// allocation on the next call.
+    pub fn step(&mut self) -> Vec<Packet> {
+        self.simulator.step()
+    }
+
+    /// Return a previously taken buffer for reuse. See 
[`PacketSimulator::recycle_buffer`].
+    pub fn recycle_buffer(&mut self, buf: Vec<Packet>) {
+        self.simulator.recycle_buffer(buf);
+    }
+
+    /// Advance network time by one tick.
+    ///
+    /// This should be called once per simulation tick, after all ready
+    /// packets have been delivered. Handles automatic partition lifecycle
+    /// and random path clogging.
+    pub fn tick(&mut self) {
+        self.simulator.tick();
+    }
+
+    /// Get the current network tick.
+    pub fn current_tick(&self) -> u64 {
+        self.simulator.current_tick()
+    }
+
+    /// Register a client with the network.
+    ///
+    /// Clients must be registered before they can send or receive packets.
+    pub fn register_client(&mut self, client_id: u128) {
+        self.simulator.register_client(client_id);
+    }
+
+    /// Set the enabled/disabled state of a specific link.
+    /// Maps `enabled = true` to [`ALLOW_ALL`] and `enabled = false` to 
[`BLOCK_ALL`].
+    pub fn set_link_filter(&mut self, from: ProcessId, to: ProcessId, enabled: 
bool) {
+        let filter = self.simulator.link_filter(from, to);
+        *filter = if enabled { ALLOW_ALL } else { BLOCK_ALL };
+    }
+
+    /// Get a mutable reference to a link's command filter.
+    /// Allows per-command filtering (e.g., block only Prepare messages).
+    pub fn link_filter_mut(&mut self, from: ProcessId, to: ProcessId) -> &mut 
LinkFilter {
+        self.simulator.link_filter(from, to)
+    }
+
+    /// Check whether a specific link is enabled (filter is not empty).
+    pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
+        self.simulator.is_link_enabled(from, to)
+    }
+
+    /// Clear all partitions, restoring full connectivity.
+    ///
+    /// **Warning:** resets all link filters to [`ALLOW_ALL`], including
+    /// manually-set per-command filters.
+    pub fn clear_partition(&mut self) {
+        self.simulator.clear_partition();
+    }
+
+    /// Clear all pending packets on a specific link.
+    pub fn link_clear(&mut self, from: ProcessId, to: ProcessId) {
+        self.simulator.link_clear(from, to);
+    }
+
+    /// Returns a mutable reference to the link's optional drop-packet 
predicate.
+    pub fn link_drop_packet_fn(
+        &mut self,
+        from: ProcessId,
+        to: ProcessId,
+    ) -> &mut Option<fn(&Packet) -> bool> {
+        self.simulator.link_drop_packet_fn(from, to)
+    }
+
+    /// Clog a specific link (bidirectionally).
+    ///
+    /// Clogged links do not deliver any packets until unclogged.
+    pub fn clog(&mut self, from: ProcessId, to: ProcessId) {
+        self.simulator.clog(from, to);
+    }
+
+    /// Unclog a specific link (bidirectionally).
+    pub fn unclog(&mut self, from: ProcessId, to: ProcessId) {
+        self.simulator.unclog(from, to);
+    }
+
+    /// Get the number of packets currently in flight.
+    pub fn packets_in_flight(&self) -> usize {
+        self.simulator.packets_in_flight()
+    }
+}
diff --git a/core/simulator/src/packet.rs b/core/simulator/src/packet.rs
new file mode 100644
index 000000000..3981ad7bd
--- /dev/null
+++ b/core/simulator/src/packet.rs
@@ -0,0 +1,982 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Packet simulation layer for deterministic network testing.
+//!
+//! This module provides a packet-level network simulator that models:
+//! - Per-path latency with exponential delay distribution
+//! - Packet loss and replay (duplication) at delivery time
+//! - Automatic network partitioning with configurable lifecycle
+//! - Automatic path clogging with exponential duration
+//! - Link capacity limits with random eviction
+//! - Per-command link filtering via `LinkFilter` (`EnumSet<Command2>`)
+//!
+//! Partitions are implemented via per-link `LinkFilter`s. When a link's
+//! filter is empty, all packets are silently dropped. When specific commands
+//! are removed from the filter, only those command types are blocked.
+//! External code can manipulate individual link filters via
+//! [`PacketSimulator::link_filter`].
+//!
+//! # Delivery model
+//!
+//! This simulator uses **pull-based batch delivery**, 
[`PacketSimulator::step()`] returns
+//! a `Vec<Packet>` containing all packets ready in the current tick. The 
caller
+//! processes the batch after `step()` returns. This means packets delivered in
+//! the same tick cannot trigger chain reactions within that tick.
+
+use crate::ready_queue::{Ready, ReadyQueue};
+use enumset::EnumSet;
+use iggy_common::header::{Command2, GenericHeader};
+use iggy_common::message::Message;
+use rand::RngExt;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+use std::collections::HashMap;
+
+/// Per-link command filter. An `EnumSet<Command2>` where:
+/// - [`ALLOW_ALL`] = all commands pass (link fully enabled)
+/// - [`BLOCK_ALL`] = all commands blocked (link fully disabled/partitioned)
+/// - Custom sets = only matching commands pass through
+pub type LinkFilter = EnumSet<Command2>;
+
+/// Link filter that allows all commands through (link fully enabled).
+pub const ALLOW_ALL: LinkFilter = EnumSet::all();
+
+/// Link filter that blocks all commands (link fully disabled / partitioned).
+pub const BLOCK_ALL: LinkFilter = EnumSet::empty();
+
+/// Identifies a process (replica or client) in the simulation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum ProcessId {
+    Replica(u8),
+    Client(u128),
+}
+
+/// A packet in flight through the simulated network.
+#[derive(Debug, Clone)]
+pub struct Packet {
+    pub from: ProcessId,
+    pub to: ProcessId,
+    pub message: Message<GenericHeader>,
+    /// Tick at which this packet becomes deliverable.
+    pub ready_at: u64,
+}
+
+impl Ready for Packet {
+    fn ready_at(&self) -> u64 {
+        self.ready_at
+    }
+}
+
+/// Configuration for the packet simulator.
+#[derive(Debug, Clone)]
+pub struct PacketSimulatorOptions {
+    /// Minimum one-way delay in ticks.
+    pub one_way_delay_min: u64,
+    /// Mean one-way delay in ticks (exponential distribution).
+    pub one_way_delay_mean: u64,
+    /// Probability of dropping a packet at delivery time [0.0, 1.0].
+    pub packet_loss_probability: f64,
+    /// Probability of replaying/duplicating a packet at delivery time [0.0, 
1.0].
+    pub replay_probability: f64,
+    /// Maximum number of packets in a single link's queue.
+    pub link_capacity: u8,
+    /// Probability per tick that a partition occurs (when not partitioned).
+    pub partition_probability: f64,
+    /// Probability per tick that a partition resolves (when partitioned).
+    pub unpartition_probability: f64,
+    /// Minimum ticks a partition lasts.
+    pub partition_stability: u32,
+    /// Minimum ticks of full connectivity before next partition.
+    pub unpartition_stability: u32,
+    /// How partitions are generated.
+    pub partition_mode: PartitionMode,
+    /// Whether partitions are symmetric or asymmetric.
+    pub partition_symmetry: PartitionSymmetry,
+    /// Probability per tick that any given path gets clogged.
+    pub path_clog_probability: f64,
+    /// Mean duration (ticks) of a clog (exponential distribution).
+    pub path_clog_duration_mean: u64,
+    /// Number of replica (node) processes.
+    pub node_count: u8,
+    /// Maximum number of client processes.
+    pub client_count: u8,
+    /// PRNG seed for deterministic behavior.
+    pub seed: u64,
+}
+
+impl Default for PacketSimulatorOptions {
+    fn default() -> Self {
+        Self {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 3,
+            packet_loss_probability: 0.0,
+            replay_probability: 0.0,
+            link_capacity: 64,
+            partition_probability: 0.0,
+            unpartition_probability: 0.0,
+            partition_stability: 0,
+            unpartition_stability: 0,
+            partition_mode: PartitionMode::None,
+            partition_symmetry: PartitionSymmetry::Symmetric,
+            path_clog_probability: 0.0,
+            path_clog_duration_mean: 0,
+            node_count: 1,
+            client_count: 0,
+            seed: 0,
+        }
+    }
+}
+
+/// Per-path link: holds packets in a ReadyQueue sorted by ready_at.
+struct Link {
+    /// Packets waiting to be delivered, ordered by ready_at (min-heap).
+    packets: ReadyQueue<Packet>,
+    /// Tick until which this link is clogged. Clogged when clogged_till > 
current_tick.
+    clogged_till: u64,
+    /// Per-command filter controlling which commands pass through this link.
+    /// [`ALLOW_ALL`] = fully enabled (default), [`BLOCK_ALL`] = fully 
disabled.
+    filter: LinkFilter,
+    /// Optional predicate to drop specific packets. Checked after the link 
filter
+    /// but before the random loss check.
+    drop_packet_fn: Option<fn(&Packet) -> bool>,
+}
+
+impl std::fmt::Debug for Link {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Link")
+            .field("packets", &self.packets)
+            .field("clogged_till", &self.clogged_till)
+            .field("filter", &self.filter)
+            .field("drop_packet_fn", &self.drop_packet_fn.map(|_| "<fn>"))
+            .finish()
+    }
+}
+
+impl Link {
+    fn new(capacity: u8) -> Self {
+        Self {
+            packets: ReadyQueue::with_capacity(capacity as usize),
+            clogged_till: 0,
+            filter: ALLOW_ALL,
+            drop_packet_fn: None,
+        }
+    }
+}
+
+/// Determines how automatic partitions are created.
+/// Only nodes (replicas) are partitioned. There will always be exactly two 
partitions.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum PartitionMode {
+    /// Disable automatic partitioning.
+    #[default]
+    None,
+    /// Draws the size of the partition uniformly at random from [1, n-1].
+    /// Replicas are randomly assigned a partition.
+    UniformSize,
+    /// Assigns each node to a partition uniformly at random.
+    /// Biases towards equal-size partitions.
+    UniformPartition,
+    /// Isolates exactly one random node.
+    IsolateSingle,
+}
+
+/// Whether partitions are symmetric or asymmetric.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum PartitionSymmetry {
+    #[default]
+    Symmetric,
+    Asymmetric,
+}
+
+/// The packet simulator manages a matrix of links between processes.
+pub struct PacketSimulator {
+    options: PacketSimulatorOptions,
+    /// Flat array of links. Index = from_idx * max_processes + to_idx.
+    links: Vec<Link>,
+    /// Maximum number of processes (determines link array size).
+    max_processes: usize,
+    /// Mapping from ProcessId to flat index.
+    process_indices: HashMap<ProcessId, usize>,
+    /// Next flat index to assign to a newly registered client.
+    /// Initialized to `replica_count` and incremented on each new 
registration.
+    next_index: usize,
+    /// Current tick (network global time).
+    current_tick: u64,
+    /// PRNG for deterministic randomness.
+    prng: Xoshiro256Plus,
+    /// Whether an automatic partition is currently active.
+    auto_partition_active: bool,
+    /// Per-node partition assignment (true = partition A, false = partition 
B).
+    auto_partition: Vec<bool>,
+    /// Countdown timer for partition/unpartition stability.
+    auto_partition_stability: u32,
+    /// Scratch buffer for Fisher-Yates shuffle in UniformSize partition mode.
+    auto_partition_nodes: Vec<usize>,
+    /// Reusable buffer for delivered packets.
+    delivered: Vec<Packet>,
+}
+
+impl PacketSimulator {
+    /// Create a new packet simulator.
+    pub fn new(options: PacketSimulatorOptions) -> Self {
+        let node_count = options.node_count as usize;
+        let client_count = options.client_count as usize;
+
+        assert!(node_count > 0, "node_count must be > 0, got {}", node_count);
+        assert!(
+            options.one_way_delay_min >= 1,
+            "one_way_delay_min must be >= 1 (got {}), zero causes unbounded 
replay loops",
+            options.one_way_delay_min
+        );
+        assert!(
+            options.one_way_delay_mean >= options.one_way_delay_min,
+            "one_way_delay_mean ({}) must be >= one_way_delay_min ({})",
+            options.one_way_delay_mean,
+            options.one_way_delay_min
+        );
+        assert!(
+            options.packet_loss_probability >= 0.0 && 
options.packet_loss_probability <= 1.0,
+            "packet_loss_probability must be in [0.0, 1.0], got {}",
+            options.packet_loss_probability
+        );
+        assert!(
+            options.replay_probability >= 0.0 && options.replay_probability <= 
1.0,
+            "replay_probability must be in [0.0, 1.0], got {}",
+            options.replay_probability
+        );
+        assert!(
+            options.partition_probability >= 0.0 && 
options.partition_probability <= 1.0,
+            "partition_probability must be in [0.0, 1.0], got {}",
+            options.partition_probability
+        );
+        assert!(
+            options.unpartition_probability >= 0.0 && 
options.unpartition_probability <= 1.0,
+            "unpartition_probability must be in [0.0, 1.0], got {}",
+            options.unpartition_probability
+        );
+        assert!(
+            options.path_clog_probability >= 0.0 && 
options.path_clog_probability <= 1.0,
+            "path_clog_probability must be in [0.0, 1.0], got {}",
+            options.path_clog_probability
+        );
+
+        let max_processes = node_count + client_count;
+
+        // Pre-register all replicas
+        let mut process_indices = HashMap::new();
+        for i in 0..node_count {
+            process_indices.insert(ProcessId::Replica(i as u8), i);
+        }
+
+        // Create link matrix (NxN)
+        let link_count = max_processes * max_processes;
+        let link_capacity = options.link_capacity;
+        let links = (0..link_count).map(|_| 
Link::new(link_capacity)).collect();
+
+        // Start with unpartition_stability grace period
+        let initial_stability = options.unpartition_stability;
+        let seed = options.seed;
+
+        Self {
+            options,
+            links,
+            max_processes,
+            process_indices,
+            next_index: node_count,
+            current_tick: 0,
+            prng: Xoshiro256Plus::seed_from_u64(seed),
+            auto_partition_active: false,
+            auto_partition: vec![false; node_count],
+            auto_partition_stability: initial_stability,
+            auto_partition_nodes: (0..node_count).collect(),
+            delivered: Vec::new(),
+        }
+    }
+
+    /// Register a client process. Returns its flat index.
+    /// Re-registering a client with the same ID returns the same index.
+    pub fn register_client(&mut self, client_id: u128) -> usize {
+        if let Some(&idx) = 
self.process_indices.get(&ProcessId::Client(client_id)) {
+            return idx;
+        }
+        let idx = self.next_index;
+        assert!(
+            idx < self.max_processes,
+            "Too many processes registered (max: {})",
+            self.max_processes
+        );
+        self.process_indices
+            .insert(ProcessId::Client(client_id), idx);
+        self.next_index += 1;
+        idx
+    }
+
+    /// Resolve a `ProcessId` to its flat index.
+    /// Fast path for replicas (direct arithmetic), HashMap fallback for 
clients.
+    fn process_index(&self, id: ProcessId) -> Option<usize> {
+        match id {
+            ProcessId::Replica(i) => {
+                let idx = i as usize;
+                if idx < self.auto_partition.len() {
+                    Some(idx)
+                } else {
+                    None
+                }
+            }
+            ProcessId::Client(_) => self.process_indices.get(&id).copied(),
+        }
+    }
+
+    /// Get the flat link index for a (from, to) pair.
+    /// Panics on unknown processes.
+    fn link_index(&self, from: ProcessId, to: ProcessId) -> usize {
+        let from_idx = self
+            .process_index(from)
+            .unwrap_or_else(|| panic!("unknown process: {from:?}"));
+        let to_idx = self
+            .process_index(to)
+            .unwrap_or_else(|| panic!("unknown process: {to:?}"));
+        from_idx * self.max_processes + to_idx
+    }
+
+    /// Submit a packet into the network.
+    ///
+    /// Always enqueues the packet. Loss and replay checks happen at delivery
+    /// time in `step()`. If the link is at capacity, a random existing packet
+    /// is evicted to make room.
+    pub fn submit(&mut self, from: ProcessId, to: ProcessId, message: 
Message<GenericHeader>) {
+        let delay = Self::calculate_delay(&mut self.prng, &self.options);
+        let ready_at = self.current_tick.saturating_add(delay);
+
+        let packet = Packet {
+            from,
+            to,
+            message,
+            ready_at,
+        };
+
+        let idx = self.link_index(from, to);
+
+        // If at capacity, evict a random existing packet
+        if self.links[idx].packets.len() >= self.options.link_capacity as 
usize {
+            self.links[idx].packets.remove_random(&mut self.prng);
+            tracing::trace!(?from, ?to, "evicted random packet (link at 
capacity)");
+        }
+        self.links[idx].packets.push(packet);
+    }
+
+    /// Calculate a random delay using exponential distribution.
+    /// Returns max(min, exponential(mean)).
+    fn calculate_delay(prng: &mut Xoshiro256Plus, options: 
&PacketSimulatorOptions) -> u64 {
+        let min = options.one_way_delay_min;
+        let mean = options.one_way_delay_mean;
+        let exp = Self::random_exponential(prng, mean);
+        min.max(exp)
+    }
+
+    /// Generate an exponentially distributed random value with the given mean.
+    /// Uses inverse CDF: -mean * ln(U) where U ~ Uniform(0,1).
+    fn random_exponential(prng: &mut Xoshiro256Plus, mean: u64) -> u64 {
+        let u: f64 = prng.random::<f64>();
+        if u > 0.0 {
+            (-(mean as f64) * u.ln()) as u64
+        } else {
+            // Fallback for u == 0.0 (ln(0) is -inf).
+            mean.saturating_mul(20)
+        }
+    }
+
+    /// Number of registered processes.
+    fn process_count(&self) -> usize {
+        self.next_index
+    }
+
+    /// Returns a mutable reference to the link's filter.
+    /// This is the per-link command filter — `EnumSet<Command2>`.
+    /// Set to [`BLOCK_ALL`] to block all packets (partition).
+    /// Set to [`ALLOW_ALL`] to allow all packets (default).
+    /// Remove specific commands to selectively filter.
+    pub fn link_filter(&mut self, from: ProcessId, to: ProcessId) -> &mut 
LinkFilter {
+        let idx = self.link_index(from, to);
+        &mut self.links[idx].filter
+    }
+
+    /// Check whether a link is currently enabled (filter is not empty).
+    pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
+        let idx = self.link_index(from, to);
+        !self.links[idx].filter.is_empty()
+    }
+
+    /// Clear all pending packets on a specific link.
+    pub fn link_clear(&mut self, from: ProcessId, to: ProcessId) {
+        let idx = self.link_index(from, to);
+        self.links[idx].packets.clear();
+    }
+
+    /// Returns a mutable reference to the link's optional drop-packet 
predicate.
+    ///
+    /// When set, the predicate is called for each packet after the link 
filter check
+    /// but before the random loss check. If it returns `true`, the packet is 
dropped.
+    pub fn link_drop_packet_fn(
+        &mut self,
+        from: ProcessId,
+        to: ProcessId,
+    ) -> &mut Option<fn(&Packet) -> bool> {
+        let idx = self.link_index(from, to);
+        &mut self.links[idx].drop_packet_fn
+    }
+
+    // TODO: implement record/replay_recorded for deterministic replay support.
+
+    /// Deliver all packets that are ready at the current tick.
+    /// Returns a `Vec` of packets that should be delivered. Does NOT advance 
the tick.
+    ///
+    /// This returns the full batch at once. Packets delivered in the same 
tick cannot trigger
+    /// chain reactions within that tick - the caller processes them after 
this returns.
+    ///
+    /// The returned `Vec` is taken from an internal buffer via 
`std::mem::take`.
+    /// After processing, pass it back via 
[`recycle_buffer`](Self::recycle_buffer).
+    ///
+    /// At delivery time, the following happens:
+    /// 1. If link is clogged -> skip (don't dequeue, clogging delays delivery)
+    /// 2. Remove a random ready packet via reservoir sampling
+    /// 3. If packet's command is not in the link filter -> drop
+    /// 4. If `drop_packet_fn` returns true -> drop
+    /// 5. Random packet loss -> drop
+    /// 6. Random replay -> clone and re-enqueue with new delay
+    /// 7. If survived -> include in delivered vec
+    pub fn step(&mut self) -> Vec<Packet> {
+        self.delivered.clear();
+
+        let Self {
+            links,
+            prng,
+            options,
+            current_tick,
+            delivered,
+            max_processes,
+            next_index,
+            ..
+        } = self;
+
+        let process_count = *next_index;
+
+        for from in 0..process_count {
+            for to in 0..process_count {
+                let idx = from * *max_processes + to;
+                let link = &mut links[idx];
+
+                // Clogged links don't deliver — packets stay in queue
+                if link.clogged_till > *current_tick {
+                    continue;
+                }
+
+                loop {
+                    let Some(packet) = link.packets.remove_ready(prng, 
*current_tick) else {
+                        break;
+                    };
+
+                    // Per-command link filter check: drop if command not in 
filter
+                    let command = packet.message.header().command;
+                    if !link.filter.contains(command) {
+                        tracing::trace!(?command, "packet dropped (command 
filtered)");
+                        continue;
+                    }
+
+                    // Custom drop predicate check
+                    if let Some(should_drop) = link.drop_packet_fn
+                        && should_drop(&packet)
+                    {
+                        tracing::trace!("packet dropped (drop_packet_fn)");
+                        continue;
+                    }
+
+                    // Random loss check
+                    if prng.random::<f64>() < options.packet_loss_probability {
+                        tracing::trace!("packet dropped (loss probability)");
+                        continue;
+                    }
+
+                    // Random replay check: clone and re-enqueue with eviction 
at capacity
+                    if prng.random::<f64>() < options.replay_probability {
+                        let delay = Self::calculate_delay(prng, options);
+                        let replay = Packet {
+                            ready_at: (*current_tick).saturating_add(delay),
+                            ..packet.clone()
+                        };
+                        if link.packets.len() >= options.link_capacity as 
usize {
+                            link.packets.remove_random(prng);
+                            tracing::trace!("evicted random packet for replay 
(link at capacity)");
+                        }
+                        link.packets.push(replay);
+                        tracing::trace!("packet replayed");
+                    }
+
+                    delivered.push(packet);
+                }
+            }
+        }
+
+        std::mem::take(&mut self.delivered)
+    }
+
+    /// Return a previously taken buffer for reuse.
+    pub fn recycle_buffer(&mut self, mut buf: Vec<Packet>) {
+        buf.clear();
+        self.delivered = buf;
+    }
+
+    /// Advance the network's tick counter.
+    /// Handles automatic partition lifecycle and random path clogging.
+    pub fn tick(&mut self) {
+        self.current_tick += 1;
+
+        // Partition lifecycle
+        if self.auto_partition_stability > 0 {
+            self.auto_partition_stability -= 1;
+        } else if self.auto_partition_active {
+            if self.prng.random::<f64>() < 
self.options.unpartition_probability {
+                self.auto_partition_active = false;
+                self.auto_partition_stability = 
self.options.unpartition_stability;
+                self.auto_partition.iter_mut().for_each(|p| *p = false);
+                // This resets ALL link filters to ALLOW_ALL, cancelling any
+                // manually-set per-command filters. Use partition_mode: None 
if you
+                // need custom filters to persist across partition cycles.
+                for link in &mut self.links {
+                    link.filter = ALLOW_ALL;
+                }
+                tracing::debug!(partition = ?self.auto_partition, 
"unpartitioned network");
+            }
+        } else if self.options.node_count > 1
+            && self.prng.random::<f64>() < self.options.partition_probability
+        {
+            self.auto_partition_network();
+            tracing::debug!(partition = ?self.auto_partition, "partitioned 
network");
+        }
+
+        // Random path clogging
+        let process_count = self.process_count();
+        for from in 0..process_count {
+            for to in 0..process_count {
+                if self.prng.random::<f64>() < 
self.options.path_clog_probability {
+                    let duration = Self::random_exponential(
+                        &mut self.prng,
+                        self.options.path_clog_duration_mean,
+                    );
+                    let idx = from * self.max_processes + to;
+                    self.links[idx].clogged_till = 
self.current_tick.saturating_add(duration);
+                    tracing::debug!(from, to, duration, "path clogged");
+                }
+            }
+        }
+    }
+
+    /// Partition the network into two groups. Guaranteed to isolate at least 
one replica.
+    fn auto_partition_network(&mut self) {
+        let node_count = self.options.node_count as usize;
+        assert!(node_count > 1);
+
+        match self.options.partition_mode {
+            PartitionMode::None => {
+                self.auto_partition.iter_mut().for_each(|p| *p = false);
+            }
+            PartitionMode::UniformSize => {
+                let partition_size = self.prng.random_range(1..node_count);
+                // Reset and shuffle pre-allocated node indices using 
Fisher-Yates
+                for (i, node) in 
self.auto_partition_nodes.iter_mut().enumerate() {
+                    *node = i;
+                }
+                for i in (1..node_count).rev() {
+                    let j = self.prng.random_range(0..=i);
+                    self.auto_partition_nodes.swap(i, j);
+                }
+                for (i, &node) in self.auto_partition_nodes.iter().enumerate() 
{
+                    self.auto_partition[node] = i < partition_size;
+                }
+            }
+            PartitionMode::UniformPartition => {
+                let mut all_same = true;
+                self.auto_partition[0] = self.prng.random::<bool>();
+                for i in 1..node_count {
+                    self.auto_partition[i] = self.prng.random::<bool>();
+                    if self.auto_partition[i] != self.auto_partition[i - 1] {
+                        all_same = false;
+                    }
+                }
+                if all_same {
+                    // Force at least one node into the opposite partition.
+                    let n = self.prng.random_range(0..node_count);
+                    self.auto_partition[n] = !self.auto_partition[0];
+                }
+            }
+            PartitionMode::IsolateSingle => {
+                self.auto_partition.iter_mut().for_each(|p| *p = false);
+                let n = self.prng.random_range(0..node_count);
+                self.auto_partition[n] = true;
+            }
+        }
+
+        self.auto_partition_active = true;
+        self.auto_partition_stability = self.options.partition_stability;
+
+        // Apply partition to links
+        let asymmetric_side = self.prng.random::<bool>();
+        let process_count = self.process_count();
+
+        for from in 0..process_count {
+            for to in 0..process_count {
+                let from_is_node = from < self.options.node_count as usize;
+                let to_is_node = to < self.options.node_count as usize;
+
+                let enabled = !from_is_node
+                    || !to_is_node
+                    || self.auto_partition[from] == self.auto_partition[to]
+                    || (self.options.partition_symmetry == 
PartitionSymmetry::Asymmetric
+                        && self.auto_partition[from] == asymmetric_side);
+
+                let idx = from * self.max_processes + to;
+                self.links[idx].filter = if enabled { ALLOW_ALL } else { 
BLOCK_ALL };
+            }
+        }
+    }
+
+    /// Get the current tick.
+    pub fn current_tick(&self) -> u64 {
+        self.current_tick
+    }
+
+    /// Clear all partitions, restoring full connectivity.
+    ///
+    /// Resets auto partition state and sets **all** link filters to 
`ALLOW_ALL`,
+    /// including any manually-set per-command filters. If you need custom 
filters
+    /// to survive partition clearing, set `partition_mode: None` and manage
+    /// partitions manually.
+    pub fn clear_partition(&mut self) {
+        self.auto_partition_active = false;
+        self.auto_partition_stability = self.options.unpartition_stability;
+        self.auto_partition.iter_mut().for_each(|p| *p = false);
+        for link in &mut self.links {
+            link.filter = ALLOW_ALL;
+        }
+        tracing::debug!("partitions cleared");
+    }
+
+    /// Clog a specific link (both directions) indefinitely until `unclog()` 
is called.
+    pub fn clog(&mut self, from: ProcessId, to: ProcessId) {
+        let idx = self.link_index(from, to);
+        self.links[idx].clogged_till = u64::MAX;
+        let idx = self.link_index(to, from);
+        self.links[idx].clogged_till = u64::MAX;
+        tracing::debug!(?from, ?to, "link clogged");
+    }
+
+    /// Unclog a specific link (both directions).
+    pub fn unclog(&mut self, from: ProcessId, to: ProcessId) {
+        let idx = self.link_index(from, to);
+        self.links[idx].clogged_till = 0;
+        let idx = self.link_index(to, from);
+        self.links[idx].clogged_till = 0;
+        tracing::debug!(?from, ?to, "link unclogged");
+    }
+
+    /// Get the number of packets in flight across all links.
+    pub fn packets_in_flight(&self) -> usize {
+        self.links.iter().map(|l| l.packets.len()).sum()
+    }
+}
+
+impl std::fmt::Debug for PacketSimulator {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PacketSimulator")
+            .field("current_tick", &self.current_tick)
+            .field("max_processes", &self.max_processes)
+            .field("processes_registered", &self.process_indices.len())
+            .field("packets_in_flight", &self.packets_in_flight())
+            .field("auto_partition_active", &self.auto_partition_active)
+            .finish()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn create_test_message() -> Message<GenericHeader> {
+        Message::<GenericHeader>::new(std::mem::size_of::<GenericHeader>())
+    }
+
+    fn create_test_message_with_command(command: Command2) -> 
Message<GenericHeader> {
+        let size = std::mem::size_of::<GenericHeader>();
+        let mut buf = vec![0u8; size];
+        let header: &mut GenericHeader = bytemuck::from_bytes_mut(&mut buf);
+        header.command = command;
+        Message::<GenericHeader>::from_bytes(bytes::Bytes::from(buf)).unwrap()
+    }
+
+    /// Helper: disable all links to/from a given replica (isolate it).
+    fn isolate_replica(sim: &mut PacketSimulator, replica: u8, replica_count: 
u8) {
+        for i in 0..replica_count {
+            if i != replica {
+                *sim.link_filter(ProcessId::Replica(i), 
ProcessId::Replica(replica)) = BLOCK_ALL;
+                *sim.link_filter(ProcessId::Replica(replica), 
ProcessId::Replica(i)) = BLOCK_ALL;
+            }
+        }
+    }
+
+    #[test]
+    fn test_basic_packet_delivery() {
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            packet_loss_probability: 0.0,
+            replay_probability: 0.0,
+            link_capacity: 64,
+            node_count: 3,
+            client_count: 1,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+
+        let msg = create_test_message();
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg);
+
+        // At tick 0, packet not ready (delay >= 1)
+        assert!(sim.step().is_empty());
+
+        // Advance ticks until delivery (exponential distribution, so may need 
more than 1)
+        for _ in 0..20 {
+            sim.tick();
+        }
+
+        let delivered = sim.step();
+        assert_eq!(delivered.len(), 1);
+        assert_eq!(delivered[0].from, ProcessId::Replica(0));
+        assert_eq!(delivered[0].to, ProcessId::Replica(1));
+    }
+
+    #[test]
+    fn test_partition_drops_not_buffers() {
+        // Verify that packets sent during a partition are dropped, not 
buffered
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            node_count: 3,
+            client_count: 0,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+        isolate_replica(&mut sim, 1, 3);
+
+        let msg = create_test_message();
+
+        // Send packet during partition
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg.clone());
+        for _ in 0..20 {
+            sim.tick();
+            sim.step(); // drain and drop
+        }
+
+        // Clear partition
+        sim.clear_partition();
+
+        // The old packet should NOT be delivered — it was dropped
+        let delivered = sim.step();
+        assert!(delivered.is_empty());
+
+        // New packet should work after partition clears
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg);
+        for _ in 0..20 {
+            sim.tick();
+        }
+        let delivered = sim.step();
+        assert_eq!(delivered.len(), 1);
+        assert_eq!(delivered[0].to, ProcessId::Replica(1));
+    }
+
+    #[test]
+    fn test_clog_unclog() {
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            node_count: 2,
+            client_count: 0,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+        let msg = create_test_message();
+
+        // Clog the link
+        sim.clog(ProcessId::Replica(0), ProcessId::Replica(1));
+
+        // Submit a packet
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg.clone());
+        for _ in 0..20 {
+            sim.tick();
+        }
+
+        // Should not be delivered (clogged)
+        assert!(sim.step().is_empty());
+        // Packet should still be in flight (buffered, not dropped)
+        assert_eq!(sim.packets_in_flight(), 1);
+
+        // Unclog
+        sim.unclog(ProcessId::Replica(0), ProcessId::Replica(1));
+
+        // Now it should deliver
+        let delivered = sim.step();
+        assert_eq!(delivered.len(), 1);
+    }
+
+    #[test]
+    fn test_auto_partition_lifecycle() {
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            partition_probability: 1.0,   // always partition
+            unpartition_probability: 1.0, // always unpartition
+            partition_stability: 5,       // partition lasts at least 5 ticks
+            unpartition_stability: 3,     // unpartition lasts at least 3 ticks
+            partition_mode: PartitionMode::IsolateSingle,
+            node_count: 3,
+            client_count: 0,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+
+        // Initially no partition
+        assert!(!sim.auto_partition_active);
+
+        // Initial grace period: unpartition_stability = 3 ticks of guaranteed 
connectivity
+        for _ in 0..3 {
+            sim.tick();
+            assert!(
+                !sim.auto_partition_active,
+                "should not partition during initial grace period"
+            );
+        }
+
+        // After grace period, should partition (probability = 1.0)
+        sim.tick();
+        assert!(sim.auto_partition_active);
+        let disabled_count = sim.links.iter().filter(|l| 
l.filter.is_empty()).count();
+        assert!(disabled_count > 0, "partition should disable some links");
+
+        // Partition should be stable for 5 ticks
+        for _ in 0..5 {
+            sim.tick();
+            assert!(sim.auto_partition_active, "partition should be stable");
+        }
+
+        // After stability period, should unpartition (probability = 1.0)
+        sim.tick();
+        assert!(!sim.auto_partition_active);
+        assert!(sim.links.iter().all(|l| l.filter == ALLOW_ALL));
+
+        // Unpartition should be stable for 3 ticks
+        for _ in 0..3 {
+            sim.tick();
+            assert!(!sim.auto_partition_active, "unpartition should be 
stable");
+        }
+
+        // After unpartition stability, should partition again
+        sim.tick();
+        assert!(sim.auto_partition_active);
+    }
+
+    #[test]
+    fn test_loss_at_delivery_time() {
+        // With 100% loss, packets should be enqueued but dropped at delivery
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            packet_loss_probability: 1.0,
+            node_count: 2,
+            client_count: 0,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+        let msg = create_test_message();
+
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), msg);
+
+        // Packet should be enqueued
+        assert_eq!(sim.packets_in_flight(), 1);
+
+        for _ in 0..20 {
+            sim.tick();
+        }
+
+        // Packet should be dequeued and dropped at delivery
+        let delivered = sim.step();
+        assert!(delivered.is_empty());
+        assert_eq!(sim.packets_in_flight(), 0);
+    }
+
+    #[test]
+    fn test_command_level_filtering() {
+        // Test that per-command filtering works: allow Ping but block Prepare
+        let options = PacketSimulatorOptions {
+            one_way_delay_min: 1,
+            one_way_delay_mean: 1,
+            packet_loss_probability: 0.0,
+            replay_probability: 0.0,
+            link_capacity: 64,
+            node_count: 2,
+            client_count: 0,
+            seed: 42,
+            ..Default::default()
+        };
+
+        let mut sim = PacketSimulator::new(options);
+
+        // Set filter to only allow Ping on link 0->1
+        let filter = sim.link_filter(ProcessId::Replica(0), 
ProcessId::Replica(1));
+        *filter = EnumSet::only(Command2::Ping);
+
+        // Submit a Ping message and a Prepare message
+        let ping_msg = create_test_message_with_command(Command2::Ping);
+        let prepare_msg = create_test_message_with_command(Command2::Prepare);
+
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), ping_msg);
+        sim.submit(ProcessId::Replica(0), ProcessId::Replica(1), prepare_msg);
+
+        // Advance time so both are ready
+        for _ in 0..20 {
+            sim.tick();
+        }
+
+        let delivered = sim.step();
+
+        // Only the Ping should be delivered
+        assert_eq!(delivered.len(), 1);
+        assert_eq!(delivered[0].message.header().command, Command2::Ping);
+
+        // Nothing left in flight
+        assert_eq!(sim.packets_in_flight(), 0);
+    }
+}
diff --git a/core/simulator/src/ready_queue.rs 
b/core/simulator/src/ready_queue.rs
new file mode 100644
index 000000000..0bd91c156
--- /dev/null
+++ b/core/simulator/src/ready_queue.rs
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use rand::{Rng, RngExt};
+
+/// Trait for items stored in a `ReadyQueue`. Must expose a tick at which
+/// the item becomes ready for delivery.
+/// This trait will be used in the storage simulator in the future.
+pub trait Ready {
+    fn ready_at(&self) -> u64;
+}
+
+/// A min-heap priority queue with reservoir-sampled random ready removal.
+///
+/// `remove_ready()` picks a uniformly random item
+/// from among all items whose `ready_at <= tick`, using reservoir sampling
+/// over the heap's tree structure. The min-heap property allows pruning entire
+/// subtrees whose root is not yet ready.
+#[derive(Debug)]
+pub struct ReadyQueue<T> {
+    items: Vec<T>,
+}
+
+impl<T: Ready> Default for ReadyQueue<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T: Ready> ReadyQueue<T> {
+    pub fn new() -> Self {
+        Self { items: Vec::new() }
+    }
+
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            items: Vec::with_capacity(capacity),
+        }
+    }
+
+    /// Push an item onto the queue, maintaining the min-heap property.
+    pub fn push(&mut self, item: T) {
+        self.items.push(item);
+        let last = self.items.len() - 1;
+        self.sift_up(last);
+    }
+
+    /// Peek at the item with the smallest `ready_at`.
+    pub fn peek(&self) -> Option<&T> {
+        self.items.first()
+    }
+
+    /// Reset the queue, removing all items but retaining the allocation.
+    /// Matches TigerBeetle's `reset()` which sets `items.len = 0`.
+    pub fn clear(&mut self) {
+        self.items.clear();
+    }
+
+    /// Remove a uniformly random ready item (one whose `ready_at <= tick`).
+    ///
+    /// Returns `None` if no items are ready.
+    pub fn remove_ready(&mut self, prng: &mut impl Rng, tick: u64) -> 
Option<T> {
+        let top = self.peek()?;
+        if top.ready_at() > tick {
+            return None;
+        }
+
+        let root = self.pick_random_ready(prng, tick, 0);
+        debug_assert!(root.count > 0);
+        debug_assert!(root.pick < self.items.len());
+
+        let result = self.remove_at(root.pick);
+        debug_assert!(result.ready_at() <= tick);
+        Some(result)
+    }
+
+    /// Remove a uniformly random item (regardless of readiness).
+    /// Used for capacity eviction.
+    pub fn remove_random(&mut self, prng: &mut impl Rng) -> Option<T> {
+        if self.items.is_empty() {
+            return None;
+        }
+        let index = prng.random_range(0..self.items.len());
+        Some(self.remove_at(index))
+    }
+
+    /// Remove the item at `index`, maintaining the heap property.
+    /// Swaps with the last element, then sifts up or down as needed.
+    pub(crate) fn remove_at(&mut self, index: usize) -> T {
+        assert!(index < self.items.len());
+        let last = self.items.len() - 1;
+        if index == last {
+            return self.items.pop().unwrap();
+        }
+        self.items.swap(index, last);
+        let removed = self.items.pop().unwrap();
+
+        // The element now at `index` came from `last`. It might need to move
+        // up or down to restore the heap property.
+        let new_pos = self.sift_up(index);
+        if new_pos == index {
+            self.sift_down(index);
+        }
+
+        removed
+    }
+
+    /// Number of items in the queue.
+    pub fn len(&self) -> usize {
+        self.items.len()
+    }
+
+    /// Whether the queue is empty.
+    pub fn is_empty(&self) -> bool {
+        self.items.is_empty()
+    }
+
+    /// Access all items in unspecified order.
+    pub fn as_slice(&self) -> &[T] {
+        &self.items
+    }
+
+    /// Sift an element up toward the root. Returns the final index.
+    fn sift_up(&mut self, mut index: usize) -> usize {
+        while index > 0 {
+            let parent = (index - 1) / 2;
+            if self.items[index].ready_at() < self.items[parent].ready_at() {
+                self.items.swap(index, parent);
+                index = parent;
+            } else {
+                break;
+            }
+        }
+        index
+    }
+
+    /// Sift an element down away from the root.
+    fn sift_down(&mut self, mut index: usize) {
+        let len = self.items.len();
+        loop {
+            let left = index * 2 + 1;
+            let right = index * 2 + 2;
+            let mut smallest = index;
+
+            if left < len && self.items[left].ready_at() < 
self.items[smallest].ready_at() {
+                smallest = left;
+            }
+            if right < len && self.items[right].ready_at() < 
self.items[smallest].ready_at() {
+                smallest = right;
+            }
+
+            if smallest == index {
+                break;
+            }
+            self.items.swap(index, smallest);
+            index = smallest;
+        }
+    }
+
+    /// Reservoir sampling over the heap subtree rooted at `index`.
+    ///
+    /// Returns the total count of ready items in the subtree and the index
+    /// of a uniformly random ready item among them.
+    fn pick_random_ready(&self, prng: &mut impl Rng, tick: u64, index: usize) 
-> SubtreePick {
+        if index >= self.items.len() || self.items[index].ready_at() > tick {
+            return SubtreePick { pick: 0, count: 0 };
+        }
+
+        // Start with the current node as our pick.
+        let mut result = SubtreePick {
+            pick: index,
+            count: 1,
+        };
+
+        // Visit both children.
+        for child_index in [index * 2 + 1, index * 2 + 2] {
+            let subtree = self.pick_random_ready(prng, tick, child_index);
+            if subtree.count > 0 {
+                let denominator = result.count + subtree.count;
+                // Replace our pick with the subtree's pick with probability
+                // subtree.count / denominator (reservoir sampling).
+                if prng.random_range(0..denominator) < subtree.count {
+                    result.pick = subtree.pick;
+                }
+                result.count = denominator;
+            }
+        }
+
+        result
+    }
+}
+
+struct SubtreePick {
+    pick: usize,
+    count: usize,
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[derive(Debug, Clone)]
+    struct TestItem {
+        ready_at: u64,
+    }
+
+    impl Ready for TestItem {
+        fn ready_at(&self) -> u64 {
+            self.ready_at
+        }
+    }
+
+    fn make_prng() -> rand_xoshiro::Xoshiro256Plus {
+        use rand_xoshiro::rand_core::SeedableRng;
+        rand_xoshiro::Xoshiro256Plus::seed_from_u64(42)
+    }
+
+    #[test]
+    fn test_heap_property() {
+        let mut q = ReadyQueue::new();
+        q.push(TestItem { ready_at: 10 });
+        q.push(TestItem { ready_at: 5 });
+        q.push(TestItem { ready_at: 15 });
+        q.push(TestItem { ready_at: 1 });
+
+        assert_eq!(q.len(), 4);
+        assert_eq!(q.peek().unwrap().ready_at, 1);
+    }
+
+    #[test]
+    fn test_remove_ready_returns_ready_items() {
+        let mut q = ReadyQueue::new();
+        q.push(TestItem { ready_at: 5 });
+        q.push(TestItem { ready_at: 10 });
+        q.push(TestItem { ready_at: 3 });
+
+        let mut prng = make_prng();
+
+        // At tick 4, only item 3 (ready_at=3) is ready
+        let item = q.remove_ready(&mut prng, 4).unwrap();
+        assert_eq!(item.ready_at, 3);
+        assert_eq!(q.len(), 2);
+
+        // At tick 4, item 1 (ready_at=5) is not yet ready
+        assert!(q.remove_ready(&mut prng, 4).is_none());
+    }
+}

Reply via email to