Copilot commented on code in PR #369:
URL: https://github.com/apache/fluss-rust/pull/369#discussion_r2898969511


##########
crates/fluss/src/client/admin.rs:
##########
@@ -37,31 +37,29 @@ use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use tokio::task::JoinHandle;
 
+#[derive(Clone)]
 pub struct FlussAdmin {
-    admin_gateway: ServerConnection,
-    #[allow(dead_code)]
     metadata: Arc<Metadata>,
-    #[allow(dead_code)]
     rpc_client: Arc<RpcClient>,
 }
 
 impl FlussAdmin {
-    pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> 
Result<Self> {
-        let admin_con =
-            connections
-                
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
-                    || Error::UnexpectedError {
-                        message: "Coordinator server not found in cluster 
metadata".to_string(),
-                        source: None,
-                    },
-                )?)
-                .await?;
-
-        Ok(FlussAdmin {
-            admin_gateway: admin_con,
+    pub fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> Self {
+        FlussAdmin {
             metadata,
             rpc_client: connections,
-        })
+        }
+    }

Review Comment:
   `FlussAdmin::new` changed from an `async fn` returning `Result<Self>` to a 
sync constructor returning `Self`. Since `FlussAdmin` is a public type, this is 
an API-breaking change for downstream crates and also changes when/where 
initialization errors surface (constructor can no longer fail).
   
   Consider keeping the existing public constructor signature (or adding a new 
`new_unchecked`/`from_parts` constructor for internal use) so callers depending 
on fallible/async initialization don’t break.



##########
crates/fluss/src/client/connection.rs:
##########
@@ -76,7 +77,23 @@ impl FlussConnection {
     }
 
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
-        FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
+        // 1. Fast path: return cached instance if already initialized.
+        if let Some(admin) = self.admin_client.read().as_ref() {
+            return Ok(admin.clone());
+        }
+
+        // 2. Slow path: acquire write lock.
+        let mut admin_guard = self.admin_client.write();
+
+        // 3. Double-check: another thread may have initialized while we 
waited.
+        if let Some(admin) = admin_guard.as_ref() {
+            return Ok(admin.clone());
+        }
+
+        // 4. Initialize and cache.
+        let admin = FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone());
+        *admin_guard = Some(admin.clone());
+        Ok(admin)

Review Comment:
   `get_admin()` now always succeeds and caches a `FlussAdmin` without 
verifying that the coordinator exists / is reachable. This is a behavior 
change: previously `get_admin()` could fail early (e.g., missing coordinator in 
metadata), and the integration readiness check in `tests/integration/utils.rs` 
relies on `get_admin().await.is_ok()` as part of cluster readiness.
   
   If `get_admin()` is intended to remain a readiness/validation point, 
consider performing a lightweight async validation (e.g., resolve coordinator + 
`RpcClient::get_connection`) before caching/returning. Since that introduces an 
`await`, prefer an async single-init primitive (`tokio::sync::OnceCell` / async 
mutex) rather than holding a `parking_lot` lock across an await.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to