This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch drain-bus
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/drain-bus by this push:
     new 04d8255e7 feat(message_bus): add drain method to collect buffered 
outbound messages
04d8255e7 is described below

commit 04d8255e71318b5e86fc9c252472774d9c775ea5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 18:53:14 2026 +0100

    feat(message_bus): add drain method to collect buffered outbound messages
    
    MessageBus only had send_to_client/send_to_replica but no
    batched retrieval for actual delivery. Add drain(&self, buf)
    following the drain_loopback_into pattern. IggyMessageBus
    buffers in a RefCell<VecDeque> outbox; MemBus drains from
    its pending_messages queue.
---
 core/consensus/src/plane_helpers.rs |   4 ++
 core/message_bus/src/lib.rs         |  50 ++++++++++++++-
 core/simulator/src/bus.rs           | 125 +++++++++++++++++++++++++++++++++++-
 3 files changed, 175 insertions(+), 4 deletions(-)

diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 55d21534a..3e0018371 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -399,6 +399,8 @@ mod tests {
         ) -> Result<(), IggyError> {
             Ok(())
         }
+
+        fn drain(&self, _buf: &mut Vec<Self::Data>) {}
     }
 
     fn prepare_message(op: u64, parent: u128, checksum: u128) -> 
Message<PrepareHeader> {
@@ -607,6 +609,8 @@ mod tests {
             self.sent.borrow_mut().push((replica, data));
             Ok(())
         }
+
+        fn drain(&self, _buf: &mut Vec<Self::Data>) {}
     }
 
     #[test]
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 876061bc4..83356d6e6 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -20,7 +20,9 @@ use crate::cache::connection::{
     ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections,
 };
 use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader, 
message::Message};
-use std::{collections::HashMap, rc::Rc};
+use std::cell::RefCell;
+use std::collections::{HashMap, VecDeque};
+use std::rc::Rc;
 
 /// Message bus parameterized by allocation strategy and sharded state
 pub trait MessageBus {
@@ -46,6 +48,12 @@ pub trait MessageBus {
         replica: Self::Replica,
         data: Self::Data,
     ) -> impl Future<Output = Result<(), IggyError>>;
+
+    /// Drain all buffered outbound messages into `buf`, leaving the internal 
queue empty.
+    ///
+    /// Messages are enqueued by [`MessageBus::send_to_client`] / 
[`MessageBus::send_to_replica`].
+    /// The caller is responsible for dispatching them to their targets.
+    fn drain(&self, buf: &mut Vec<Self::Data>);
 }
 
 // TODO: explore generics for Strategy
@@ -54,6 +62,7 @@ pub struct IggyMessageBus {
     clients: HashMap<u128, SenderKind>,
     replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
     shard_id: u16,
+    outbox: RefCell<VecDeque<Message<GenericHeader>>>,
 }
 
 impl IggyMessageBus {
@@ -69,6 +78,7 @@ impl IggyMessageBus {
                 },
             },
             shard_id,
+            outbox: RefCell::new(VecDeque::new()),
         }
     }
 
@@ -112,25 +122,59 @@ impl MessageBus for IggyMessageBus {
     async fn send_to_client(
         &self,
         client_id: Self::Client,
-        _message: Self::Data,
+        message: Self::Data,
     ) -> Result<(), IggyError> {
         #[allow(clippy::cast_possible_truncation)] // 
IggyError::ClientNotFound takes u32
         let _sender = self
             .clients
             .get(&client_id)
             .ok_or(IggyError::ClientNotFound(client_id as u32))?;
+        self.outbox.borrow_mut().push_back(message);
         Ok(())
     }
 
     async fn send_to_replica(
         &self,
         replica: Self::Replica,
-        _message: Self::Data,
+        message: Self::Data,
     ) -> Result<(), IggyError> {
         // TODO: Handle lazily creating the connection.
         let _connection = self
             .get_replica_connection(replica)
             .ok_or(IggyError::ResourceNotFound(format!("Replica {replica}")))?;
+        self.outbox.borrow_mut().push_back(message);
         Ok(())
     }
+
+    fn drain(&self, buf: &mut Vec<Self::Data>) {
+        buf.extend(self.outbox.borrow_mut().drain(..));
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn drain_empty_yields_nothing() {
+        let bus = IggyMessageBus::new(1, 0, 42);
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn double_drain_second_empty() {
+        let bus = IggyMessageBus::new(1, 0, 42);
+        let msg = 
Message::<GenericHeader>::new(std::mem::size_of::<GenericHeader>());
+        bus.outbox.borrow_mut().push_back(msg);
+
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert_eq!(buf.len(), 1);
+
+        buf.clear();
+        bus.drain(&mut buf);
+        assert!(buf.is_empty());
+    }
 }
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 371258aef..a0ae7cda5 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -51,13 +51,21 @@ impl MemBus {
         }
     }
 
-    /// Get the next pending message from the bus
+    /// Get the next pending message from the bus.
     ///
     /// # Panics
     /// Panics if the internal mutex is poisoned.
     pub fn receive(&self) -> Option<Envelope> {
         self.pending_messages.lock().unwrap().pop_front()
     }
+
+    /// Drain all pending envelopes into `buf`, preserving routing metadata.
+    ///
+    /// # Panics
+    /// Panics if the internal mutex is poisoned.
+    pub fn drain_envelopes(&self, buf: &mut Vec<Envelope>) {
+        buf.extend(self.pending_messages.lock().unwrap().drain(..));
+    }
 }
 
 impl MessageBus for MemBus {
@@ -128,6 +136,16 @@ impl MessageBus for MemBus {
 
         Ok(())
     }
+
+    fn drain(&self, buf: &mut Vec<Self::Data>) {
+        buf.extend(
+            self.pending_messages
+                .lock()
+                .unwrap()
+                .drain(..)
+                .map(|envelope| envelope.message),
+        );
+    }
 }
 
 /// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
@@ -178,4 +196,109 @@ impl MessageBus for SharedMemBus {
     ) -> Result<(), IggyError> {
         self.0.send_to_replica(replica, message).await
     }
+
+    fn drain(&self, buf: &mut Vec<Self::Data>) {
+        self.0.drain(buf);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_common::header::GenericHeader;
+
+    fn make_message() -> Message<GenericHeader> {
+        Message::<GenericHeader>::new(std::mem::size_of::<GenericHeader>())
+    }
+
+    #[test]
+    fn drain_empty_yields_nothing() {
+        let bus = MemBus::new();
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn send_to_replica_then_drain() {
+        let mut bus = MemBus::new();
+        bus.add_replica(0);
+
+        futures::executor::block_on(bus.send_to_replica(0, 
make_message())).unwrap();
+        futures::executor::block_on(bus.send_to_replica(0, 
make_message())).unwrap();
+
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert_eq!(buf.len(), 2);
+    }
+
+    #[test]
+    fn send_to_client_then_drain() {
+        let mut bus = MemBus::new();
+        bus.add_client(42, ());
+
+        futures::executor::block_on(bus.send_to_client(42, 
make_message())).unwrap();
+
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert_eq!(buf.len(), 1);
+    }
+
+    #[test]
+    fn double_drain_second_empty() {
+        let mut bus = MemBus::new();
+        bus.add_replica(0);
+
+        futures::executor::block_on(bus.send_to_replica(0, 
make_message())).unwrap();
+
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert_eq!(buf.len(), 1);
+
+        buf.clear();
+        bus.drain(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn send_to_unknown_replica_errors_and_drain_empty() {
+        let bus = MemBus::new();
+        let result = futures::executor::block_on(bus.send_to_replica(99, 
make_message()));
+        assert!(result.is_err());
+
+        let mut buf = Vec::new();
+        bus.drain(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn drain_envelopes_preserves_routing() {
+        let mut bus = MemBus::new();
+        bus.add_replica(1);
+        bus.add_client(42, ());
+
+        futures::executor::block_on(bus.send_to_replica(1, 
make_message())).unwrap();
+        futures::executor::block_on(bus.send_to_client(42, 
make_message())).unwrap();
+
+        let mut envelopes = Vec::new();
+        bus.drain_envelopes(&mut envelopes);
+        assert_eq!(envelopes.len(), 2);
+        assert_eq!(envelopes[0].to_replica, Some(1));
+        assert_eq!(envelopes[0].to_client, None);
+        assert_eq!(envelopes[1].to_client, Some(42));
+        assert_eq!(envelopes[1].to_replica, None);
+    }
+
+    #[test]
+    fn shared_mem_bus_drain_delegates() {
+        let bus = Arc::new(MemBus::new());
+        let mut shared = SharedMemBus(bus);
+        shared.add_replica(0);
+
+        futures::executor::block_on(shared.send_to_replica(0, 
make_message())).unwrap();
+
+        let mut buf = Vec::new();
+        shared.drain(&mut buf);
+        assert_eq!(buf.len(), 1);
+    }
 }

Reply via email to