fresh-borzoni commented on code in PR #404:
URL: https://github.com/apache/fluss-rust/pull/404#discussion_r2898639567
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -561,8 +776,96 @@ impl Sender {
)
}
- pub async fn close(&mut self) {
- self.running = false;
+ /// Event-loop sender: drain batches and fire RPCs into a
`FuturesUnordered`,
+ /// then process responses as they arrive. This interleaves drain cycles
with
+ /// response handling — when a fast leader responds, we immediately drain
and
+ /// send more batches for its buckets while slow leaders are still
in-flight.
+ ///
+ /// Better than Java's fire-and-forget + Netty callback threads: same I/O
+ /// overlap, but single-task cooperative multitasking — no cross-thread
+ /// synchronization needed for response handling.
+ pub async fn run_with_shutdown(&self, mut shutdown_rx: mpsc::Receiver<()>)
-> Result<()> {
+ let mut pending: FuturesUnordered<SendFuture<'_>> =
FuturesUnordered::new();
+
+ loop {
+ if pending.is_empty() {
+ // Nothing in-flight: run a full drain cycle. This may briefly
+ // block on writer ID init or metadata refresh — acceptable
here
+ // because there are no pending responses to starve.
+ tokio::select! {
+ result = self.prepare_sends() => {
+ match result {
+ Ok((futures, delay)) => {
+ if futures.is_empty() {
+ // Nothing to drain. Sleep for the
ready-check
+ // delay to avoid busy-spinning.
+ // TODO: add a Notify that append()
signals so we
+ // wake immediately on new batches instead
of
+ // polling on a timer (same as Kafka's
wakeup()).
+ let sleep_ms = delay.unwrap_or(1);
+ tokio::select! {
+ _ = shutdown_rx.recv() => break,
+ _ =
tokio::time::sleep(Duration::from_millis(sleep_ms)) => continue,
+ }
+ }
+ for f in futures {
+ pending.push(f);
+ }
+ }
+ Err(e) => {
+ warn!("Uncaught error in sender drain,
continuing: {e}");
+ tokio::select! {
+ _ = shutdown_rx.recv() => break,
+ _ =
tokio::time::sleep(Duration::from_millis(1)) => continue,
+ }
+ }
+ }
+ }
+ _ = shutdown_rx.recv() => break,
+ }
+ } else {
+ // Sends are in-flight: process responses as they arrive,
+ // then try to drain and send more batches. The prepare_sends
+ // call may briefly await on writer ID init or metadata refresh
+ // in rare cases (writer ID reset, leader change); this is
+ // acceptable because the alternative (skipping the drain)
+ // would delay recovery.
Review Comment:
Implemented the concurrent init task approach. The drain path is now fully
synchronous - no .await on the hot path. Writer-ID init and metadata refresh
run as concurrent maintenance futures in the select!, so pending.next() is
always polled when sends are in-flight.
Also added a Notify-based wakeup so the sender drains immediately when
producers create/fill batches
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]