Copilot commented on code in PR #399:
URL: https://github.com/apache/fluss-rust/pull/399#discussion_r2907739657


##########
website/docs/user-guide/cpp/api-reference.md:
##########
@@ -29,6 +29,7 @@ Complete API reference for the Fluss C++ client.
 | `scanner_remote_log_read_concurrency` | `size_t`      | `4`                  
| Streaming read concurrency within a remote log file                           
           |
 | `scanner_log_max_poll_records`        | `size_t`      | `500`                
| Maximum number of records returned in a single Poll()                         
           |
 | `connect_timeout_ms`                  | `uint64_t`    | `120000`             
| TCP connect timeout in milliseconds                                           
           |
+| `request_timeout_ms`                  | `uint64_t`    | `30000`              
| Per-request RPC timeout in milliseconds                                       
           |

Review Comment:
   The new `request_timeout_ms` entry is documented as a full per-request RPC 
timeout, but the current implementation only times out while awaiting the 
response after the request is sent (writes may still block beyond the timeout). 
Consider adjusting this C++ API reference text to reflect the actual timeout 
semantics.
   ```suggestion
   | `request_timeout_ms`                  | `uint64_t`    | `30000`            
  | Timeout in milliseconds while waiting for an RPC response after the request 
is sent (writes may block beyond this timeout) |
   ```



##########
website/docs/user-guide/python/api-reference.md:
##########
@@ -22,6 +22,7 @@ Complete API reference for the Fluss Python client.
 | `scanner_remote_log_read_concurrency` | 
`scanner.remote-log.read-concurrency` | Get/set streaming read concurrency 
within a remote log file                             |
 | `scanner_log_max_poll_records`        | `scanner.log.max-poll-records`       
 | Get/set max number of records returned in a single poll()                    
           |
 | `connect_timeout_ms`                  | `connect-timeout`                    
 | Get/set TCP connect timeout in milliseconds                                  
           |
+| `request_timeout_ms`                  | `request-timeout`                    
 | Get/set per-request RPC timeout in milliseconds                              
           |

Review Comment:
   The new `request_timeout_ms` entry is documented as a general per-request 
RPC timeout, but the Rust implementation only times out the response-wait 
portion (send/write can still exceed the duration). Consider clarifying this in 
the Python API reference (or documenting the exact semantics) so users don't 
assume the timeout covers the full end-to-end RPC.
   ```suggestion
   | `request_timeout_ms`                  | `request-timeout`                  
   | Get/set max time in ms to wait for an RPC response after the request is 
sent (does not limit time spent sending the request) |
   ```



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +602,72 @@ impl Drop for CleanupRequestStateOnCancel {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::TablePath;
+    use crate::rpc::message::TableExistsRequest;
+
+    #[tokio::test]
+    async fn test_request_timeout() {
+        // Create a duplex stream where the "server" side never responds.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            Some(Duration::from_millis(50)),
+        );

Review Comment:
   This test uses a very small request timeout (50ms). On slow/heavily loaded 
CI runners this can be flaky due to timer scheduling delays. Consider using 
Tokio's paused time (`#[tokio::test(start_paused = true)]` + 
`tokio::time::advance`) or increasing the timeout enough to be robust.



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +602,72 @@ impl Drop for CleanupRequestStateOnCancel {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::TablePath;
+    use crate::rpc::message::TableExistsRequest;
+
+    #[tokio::test]
+    async fn test_request_timeout() {
+        // Create a duplex stream where the "server" side never responds.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            Some(Duration::from_millis(50)),
+        );
+
+        let table_path = TablePath::new("db", "table");
+        let request = TableExistsRequest::new(&table_path);
+        let result = conn.request(request).await;
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(
+            matches!(
+                err,
+                Error::RpcError {
+                    source: RpcError::RequestTimeout { .. },
+                    ..
+                }
+            ),
+            "expected RequestTimeout, got: {err}"
+        );
+
+        // Timeout must not poison the connection — other requests should 
still work.
+        assert!(!conn.is_poisoned());
+
+        // The timed-out request must be removed from the request map (no 
state leak).
+        if let ConnectionState::RequestMap(map) = 
conn.state.lock().deref_mut() {
+            assert!(map.is_empty(), "request map should be empty after 
timeout");
+        } else {
+            panic!("connection should not be poisoned after a timeout");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_request_no_timeout() {
+        // With no request timeout configured, request should remain pending
+        // when the server does not respond.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            None,
+        );
+
+        let table_path = TablePath::new("db", "table");
+        let request = TableExistsRequest::new(&table_path);
+        let pending = tokio::time::timeout(Duration::from_millis(50), 
conn.request(request)).await;
+        assert!(
+            pending.is_err(),
+            "expected request to remain pending without per-request timeout"
+        );

Review Comment:
   This assertion depends on a 50ms wall-clock timeout to ensure the future 
stays pending. That can be flaky on slow CI (timeout may elapse late/early 
relative to scheduling). Using paused time or a larger timeout would make the 
test more deterministic.



##########
website/docs/user-guide/rust/api-reference.md:
##########
@@ -21,6 +21,7 @@ Complete API reference for the Fluss Rust client.
 | `scanner_remote_log_read_concurrency` | `usize`         | `4`                
| Streaming read concurrency within a remote log file                           
       |
 | `scanner_log_max_poll_records`        | `usize`         | `500`              
| Maximum number of records returned in a single poll()                         
       |
 | `connect_timeout_ms`                  | `u64`           | `120000`           
| TCP connect timeout in milliseconds                                           
       |
+| `request_timeout_ms`                  | `u64`           | `30000`            
| Per-request RPC timeout in milliseconds                                       
       |

Review Comment:
   The new `request_timeout_ms` docs describe this as a generic "Per-request 
RPC timeout", but the implementation currently only applies the timeout to the 
response-wait phase (writes can still block longer). It would be helpful to 
clarify that nuance in this user-facing API reference to avoid over-promising 
the timeout semantics.
   ```suggestion
   | `request_timeout_ms`                  | `u64`           | `30000`          
  | Timeout in ms for waiting on RPC responses; request writes may block longer 
         |
   ```



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -80,14 +81,20 @@ impl RpcClient {
         RpcClient {
             connections: Default::default(),
             client_id: Arc::from(""),
-            timeout: None,
+            connect_timeout: None,
+            request_timeout: None,
             max_message_size: usize::MAX,
             sasl_config: None,
         }
     }
 
-    pub fn with_timeout(mut self, timeout: Duration) -> Self {
-        self.timeout = Some(timeout);
+    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
+        self.connect_timeout = Some(timeout);
+        self
+    }
+
+    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
+        self.request_timeout = Some(timeout);
         self

Review Comment:
   `RpcClient::with_timeout` was removed/renamed to `with_connect_timeout`, but 
`RpcClient` is publicly re-exported (`crates/fluss/src/rpc/mod.rs`), so this is 
a breaking API change for downstream users. Consider keeping `with_timeout` as 
a (deprecated) wrapper around `with_connect_timeout` to preserve compatibility, 
or clearly document the breaking change in the public API/changelog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to