fresh-borzoni commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2898639567


##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
         )
     }
 
-    pub async fn close(&mut self) {
-        self.running = false;
+    /// Event-loop sender: drain batches and fire RPCs into a 
`FuturesUnordered`,
+    /// then process responses as they arrive. This interleaves drain cycles 
with
+    /// response handling — when a fast leader responds, we immediately drain 
and
+    /// send more batches for its buckets while slow leaders are still 
in-flight.
+    ///
+    /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+    /// overlap, but single-task cooperative multitasking — no cross-thread
+    /// synchronization needed for response handling.
+    pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>) 
-> Result<()> {
+        let mut pending: FuturesUnordered<SendFuture<'_>> = 
FuturesUnordered::new();
+
+        loop {
+            if pending.is_empty() {
+                // Nothing in-flight: run a full drain cycle. This may briefly
+                // block on writer ID init or metadata refresh — acceptable 
here
+                // because there are no pending responses to starve.
+                tokio::select! {
+                    result = self.prepare_sends() => {
+                        match result {
+                            Ok((futures, delay)) => {
+                                if futures.is_empty() {
+                                    // Nothing to drain. Sleep for the 
ready-check
+                                    // delay to avoid busy-spinning.
+                                    // TODO: add a Notify that append() 
signals so we
+                                    // wake immediately on new batches instead 
of
+                                    // polling on a timer (same as Kafka's 
wakeup()).
+                                    let sleep_ms = delay.unwrap_or(1);
+                                    tokio::select! {
+                                        _ = shutdown_rx.recv() => break,
+                                        _ = 
tokio::time::sleep(Duration::from_millis(sleep_ms)) => continue,
+                                    }
+                                }
+                                for f in futures {
+                                    pending.push(f);
+                                }
+                            }
+                            Err(e) => {
+                                warn!("Uncaught error in sender drain, 
continuing: {e}");
+                                tokio::select! {
+                                    _ = shutdown_rx.recv() => break,
+                                    _ = 
tokio::time::sleep(Duration::from_millis(1)) => continue,
+                                }
+                            }
+                        }
+                    }
+                    _ = shutdown_rx.recv() => break,
+                }
+            } else {
+                // Sends are in-flight: process responses as they arrive,
+                // then try to drain and send more batches. The prepare_sends
+                // call may briefly await on writer ID init or metadata refresh
+                // in rare cases (writer ID reset, leader change); this is
+                // acceptable because the alternative (skipping the drain)
+                // would delay recovery.

Review Comment:
   Implemented the concurrent init task approach. The drain path is now fully 
synchronous -  no .await on the hot path. Writer-ID init and metadata refresh 
run as concurrent maintenance futures in the select!, so pending.next() is 
always polled when sends are in-flight. 
   Also added a Notify-based wakeup so the sender drains immediately when 
producers create/fill batches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to