andygrove commented on code in PR #1578:
URL: 
https://github.com/apache/datafusion-ballista/pull/1578#discussion_r3150510976


##########
ballista/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -95,18 +97,48 @@ impl ShuffleReaderExec {
             metrics: ExecutionPlanMetricsSet::new(),
             properties,
             work_dir: None, // to be updated at the executor side
+            client_pool: None,
         })
     }
 
     /// changes work dir where shuffle files are located
-    pub fn change_work_dir(&self, work_dir: String) -> Self {
+    pub fn with_work_dir(&self, work_dir: String) -> Self {

Review Comment:
   Small thought on the rename from `change_work_dir`. Since this is `pub` on a 
public struct, custom `ExecutionEngine` impls in downstream projects may be 
calling it. Would it be worth keeping a `#[deprecated]` shim under the old name 
that forwards to `with_work_dir`? That way folks get a warning instead of a 
hard compile error.



##########
ballista/executor/src/executor.rs:
##########
@@ -127,7 +127,30 @@ impl Executor {
         function_registry: Arc<BallistaFunctionRegistry>,
         metrics_collector: Arc<dyn ExecutorMetricsCollector>,
         concurrent_tasks: usize,
-        execution_engine: Option<Arc<dyn ExecutionEngine>>,
+        execution_engine: Arc<dyn ExecutionEngine>,

Review Comment:
   Nice work splitting out `with_default_execution_engine`. One thing worth 
checking. This changes `Executor::new` from `Option<Arc<dyn ExecutionEngine>>` 
to `Arc<dyn ExecutionEngine>`. Anyone downstream calling `Executor::new(..., 
None)` will hit a compile error once this lands. Could we keep the old 
signature and have `with_default_execution_engine` be purely additive? If we do 
want the breaking change, it might be worth flagging the PR with the `api 
change` label.



##########
ballista/core/src/client_pool.rs:
##########
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Connection pool for `BallistaClient`` instances.
+//!
+//! `DefaultBallistaClientPool` maintains a `VecDeque`` of idle clients per
+//! `(host, port)` key backed by a `DashMap`. Callers 
`BallistaClientPool::acquire` a
+//! `PooledClient` guard; when the guard is dropped the underlying client is
+//! returned to the idle deque automatically.
+//!
+//! If the connection errored, call`PooledClient::discard` before dropping so
+//! the pool closes the channel rather than reusing it.
+//!
+//! A background tokio task evicts idle connections that have not been returned
+//! within the configured `idle_timeout`.
+
+use crate::client::BallistaClient;
+use crate::error::Result;
+use crate::extension::BallistaConfigGrpcEndpoint;
+use crate::utils::GrpcClientConfig;
+use async_trait::async_trait;
+use dashmap::DashMap;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::ops::{Deref, DerefMut};
+use std::sync::{Arc, Weak};
+use std::time::{Duration, Instant};
+
+// ---------------------------------------------------------------------------
+// PooledClient guard
+// ---------------------------------------------------------------------------
+
+/// A [BallistaClient] checked out from a pool.
+///
+/// Implements [Deref] / [DerefMut] so it can be used exactly like a
+/// [BallistaClient]. On drop, the inner client is returned to the pool
+/// automatically. Call [PooledClient::discard] before dropping if the
+/// connection should **not** be reused (e.g. after a transport error).
+pub struct PooledClient {
+    client: BallistaClient,
+    /// Invoked in `Drop::drop` to push the client back into the idle deque.
+    /// `None` after `discard()` is called.
+    return_fn: Option<Box<dyn FnOnce(BallistaClient) + Send>>,
+}
+
+impl PooledClient {
+    pub(crate) fn new(
+        client: BallistaClient,
+        return_fn: Box<dyn FnOnce(BallistaClient) + Send>,
+    ) -> Self {
+        Self {
+            client,
+            return_fn: Some(return_fn),
+        }
+    }
+
+    /// Close the connection instead of returning it to the pool.
+    pub fn discard(mut self) {
+        self.return_fn = None;
+    }
+}
+
+impl Deref for PooledClient {
+    type Target = BallistaClient;
+    fn deref(&self) -> &Self::Target {
+        &self.client
+    }
+}
+
+impl DerefMut for PooledClient {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.client
+    }
+}
+
+impl Drop for PooledClient {
+    fn drop(&mut self) {
+        if let Some(f) = self.return_fn.take() {
+            let client = self.client.clone();
+            f(client);
+        }
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Trait
+// ---------------------------------------------------------------------------
+
+/// Manages a pool of reusable [BallistaClient] connections.
+#[async_trait]
+pub trait BallistaClientPool: Send + Sync + Debug {
+    /// Acquire an idle client for `(host, port)`, or create a new one if the
+    /// pool is empty for that key. The returned [PooledClient] returns itself
+    /// to the pool on drop.
+    async fn acquire(
+        &self,
+        host: &str,
+        port: u16,
+        config: &GrpcClientConfig,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    ) -> Result<PooledClient>;
+
+    /// Remove all idle clients that have been sitting unused longer than the
+    /// configured idle timeout. Called automatically by the background task in
+    /// [DefaultBallistaClientPool]; can also be invoked on demand.
+    async fn evict_idle(&self);
+}
+
+// ---------------------------------------------------------------------------
+// DefaultBallistaClientPool
+// ---------------------------------------------------------------------------
+
+struct IdleEntry {
+    client: BallistaClient,
+    idle_since: Instant,
+}
+
+type IdleMap = DashMap<(String, u16), VecDeque<IdleEntry>>;
+
+struct Inner {
+    idle: IdleMap,
+    idle_timeout: Duration,
+}
+
+/// Default pool implementation.
+///
+/// Keeps a `VecDeque<BallistaClient>` per `(host, port)`. Idle clients are
+/// evicted by a background tokio task that runs at `idle_timeout / 3`
+/// intervals (minimum 15 s). The task exits automatically when the pool `Arc`
+/// is dropped.
+
+#[derive(Clone)]
+pub struct DefaultBallistaClientPool {
+    inner: Arc<Inner>,
+}
+
+impl Debug for DefaultBallistaClientPool {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("DefaultBallistaClientPool").finish()
+    }
+}
+
+impl DefaultBallistaClientPool {
+    /// Create a pool that evicts connections idle longer than `idle_timeout`,
+    pub fn with_eviction_thread(idle_timeout: Duration) -> Self {

Review Comment:
   Looks like this doc got truncated. It ends on a comma with no predicate. 
Looks like maybe an in-progress copy from `new()` below? Could be filled in 
with something like 'using a background eviction thread' to round out the 
sentence.



##########
ballista/core/src/client_pool.rs:
##########
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Connection pool for `BallistaClient`` instances.
+//!
+//! `DefaultBallistaClientPool` maintains a `VecDeque`` of idle clients per
+//! `(host, port)` key backed by a `DashMap`. Callers 
`BallistaClientPool::acquire` a
+//! `PooledClient` guard; when the guard is dropped the underlying client is
+//! returned to the idle deque automatically.
+//!
+//! If the connection errored, call`PooledClient::discard` before dropping so
+//! the pool closes the channel rather than reusing it.
+//!
+//! A background tokio task evicts idle connections that have not been returned
+//! within the configured `idle_timeout`.
+
+use crate::client::BallistaClient;
+use crate::error::Result;
+use crate::extension::BallistaConfigGrpcEndpoint;
+use crate::utils::GrpcClientConfig;
+use async_trait::async_trait;
+use dashmap::DashMap;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::ops::{Deref, DerefMut};
+use std::sync::{Arc, Weak};
+use std::time::{Duration, Instant};
+
+// ---------------------------------------------------------------------------
+// PooledClient guard
+// ---------------------------------------------------------------------------
+
+/// A [BallistaClient] checked out from a pool.
+///
+/// Implements [Deref] / [DerefMut] so it can be used exactly like a
+/// [BallistaClient]. On drop, the inner client is returned to the pool
+/// automatically. Call [PooledClient::discard] before dropping if the
+/// connection should **not** be reused (e.g. after a transport error).
+pub struct PooledClient {
+    client: BallistaClient,
+    /// Invoked in `Drop::drop` to push the client back into the idle deque.
+    /// `None` after `discard()` is called.
+    return_fn: Option<Box<dyn FnOnce(BallistaClient) + Send>>,
+}
+
+impl PooledClient {
+    pub(crate) fn new(
+        client: BallistaClient,
+        return_fn: Box<dyn FnOnce(BallistaClient) + Send>,
+    ) -> Self {
+        Self {
+            client,
+            return_fn: Some(return_fn),
+        }
+    }
+
+    /// Close the connection instead of returning it to the pool.
+    pub fn discard(mut self) {
+        self.return_fn = None;
+    }
+}
+
+impl Deref for PooledClient {
+    type Target = BallistaClient;
+    fn deref(&self) -> &Self::Target {
+        &self.client
+    }
+}
+
+impl DerefMut for PooledClient {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.client
+    }
+}
+
+impl Drop for PooledClient {
+    fn drop(&mut self) {
+        if let Some(f) = self.return_fn.take() {
+            let client = self.client.clone();
+            f(client);
+        }
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Trait
+// ---------------------------------------------------------------------------
+
+/// Manages a pool of reusable [BallistaClient] connections.
+#[async_trait]
+pub trait BallistaClientPool: Send + Sync + Debug {
+    /// Acquire an idle client for `(host, port)`, or create a new one if the
+    /// pool is empty for that key. The returned [PooledClient] returns itself
+    /// to the pool on drop.
+    async fn acquire(
+        &self,
+        host: &str,
+        port: u16,
+        config: &GrpcClientConfig,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    ) -> Result<PooledClient>;
+
+    /// Remove all idle clients that have been sitting unused longer than the
+    /// configured idle timeout. Called automatically by the background task in
+    /// [DefaultBallistaClientPool]; can also be invoked on demand.
+    async fn evict_idle(&self);
+}
+
+// ---------------------------------------------------------------------------
+// DefaultBallistaClientPool
+// ---------------------------------------------------------------------------
+
+struct IdleEntry {
+    client: BallistaClient,
+    idle_since: Instant,
+}
+
+type IdleMap = DashMap<(String, u16), VecDeque<IdleEntry>>;
+
+struct Inner {
+    idle: IdleMap,
+    idle_timeout: Duration,
+}
+
+/// Default pool implementation.
+///
+/// Keeps a `VecDeque<BallistaClient>` per `(host, port)`. Idle clients are
+/// evicted by a background tokio task that runs at `idle_timeout / 3`
+/// intervals (minimum 15 s). The task exits automatically when the pool `Arc`
+/// is dropped.
+
+#[derive(Clone)]
+pub struct DefaultBallistaClientPool {
+    inner: Arc<Inner>,
+}
+
+impl Debug for DefaultBallistaClientPool {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("DefaultBallistaClientPool").finish()
+    }
+}
+
+impl DefaultBallistaClientPool {
+    /// Create a pool that evicts connections idle longer than `idle_timeout`,
+    pub fn with_eviction_thread(idle_timeout: Duration) -> Self {
+        Self::new(idle_timeout, true)
+    }
+
+    /// Create a pool that evicts connections idle longer than `idle_timeout`,
+    /// if `enable_eviction_thread` is enabled
+    pub fn new(idle_timeout: Duration, enable_eviction_thread: bool) -> Self {
+        let inner = Arc::new(Inner {
+            idle: DashMap::new(),
+            idle_timeout,
+        });
+
+        let weak: Weak<Inner> = Arc::downgrade(&inner);
+        // TODO: do we limit minimum interval here?
+        let check_interval = Duration::from_secs((idle_timeout.as_secs() / 
3).max(15));
+
+        if enable_eviction_thread {
+            tokio::spawn(async move {
+                log::debug!(
+                    "client connection pool - eviction thread started ... 
interval: {check_interval:?}"
+                );
+                let mut ticker = tokio::time::interval(check_interval);
+                loop {
+                    ticker.tick().await;
+
+                    match weak.upgrade() {
+                        None => break,
+                        Some(pool) => {
+                            log::trace!("client connection pool - evicting 
connections");
+                            evict(&pool.idle, pool.idle_timeout)
+                        }
+                    }
+                }
+                log::debug!("client connection pool - eviction thread ... 
DONE");
+            });
+        }
+
+        Self { inner }
+    }
+
+    /// Total number of idle connections currently held across all endpoints.
+    pub fn idle_count(&self) -> usize {
+        self.inner.idle.iter().map(|e| e.value().len()).sum()
+    }
+}
+
+fn evict(idle: &IdleMap, timeout: Duration) {
+    let deadline = Instant::now()
+        .checked_sub(timeout)
+        .unwrap_or_else(Instant::now);
+
+    // Drain expired entries from the front of each deque (oldest = front).
+    idle.retain(|_, deque| {
+        while deque.front().is_some_and(|e| e.idle_since <= deadline) {

Review Comment:
   Tiny boundary thing. The trait doc above (`evict_idle`) says 'sitting unused 
longer than the configured idle timeout', but `idle_since <= deadline` will 
also evict entries that are exactly at the timeout. Should this be a strict `<` 
to match the wording?



##########
ballista/core/Cargo.toml:
##########
@@ -49,6 +49,7 @@ async-trait = { workspace = true }
 aws-config = { version = "1.8.16", optional = true }
 aws-credential-types = { version = "1.2.0", optional = true }
 chrono = { version = "0.4", default-features = false }
+dashmap = { workspace = true }

Review Comment:
   Since pooling is opt-in at runtime (off by default via `--connection-cache 
0`), would it make sense to gate `dashmap` and the new `client_pool` module 
behind a cargo feature? That way `--no-default-features` builds stay lean and 
folks who never enable pooling do not pull in dashmap.



##########
ballista/core/src/client_pool.rs:
##########
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Connection pool for `BallistaClient`` instances.
+//!
+//! `DefaultBallistaClientPool` maintains a `VecDeque`` of idle clients per
+//! `(host, port)` key backed by a `DashMap`. Callers 
`BallistaClientPool::acquire` a
+//! `PooledClient` guard; when the guard is dropped the underlying client is
+//! returned to the idle deque automatically.
+//!
+//! If the connection errored, call`PooledClient::discard` before dropping so
+//! the pool closes the channel rather than reusing it.
+//!
+//! A background tokio task evicts idle connections that have not been returned
+//! within the configured `idle_timeout`.
+
+use crate::client::BallistaClient;
+use crate::error::Result;
+use crate::extension::BallistaConfigGrpcEndpoint;
+use crate::utils::GrpcClientConfig;
+use async_trait::async_trait;
+use dashmap::DashMap;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::ops::{Deref, DerefMut};
+use std::sync::{Arc, Weak};
+use std::time::{Duration, Instant};
+
+// ---------------------------------------------------------------------------
+// PooledClient guard
+// ---------------------------------------------------------------------------
+
+/// A [BallistaClient] checked out from a pool.
+///
+/// Implements [Deref] / [DerefMut] so it can be used exactly like a
+/// [BallistaClient]. On drop, the inner client is returned to the pool
+/// automatically. Call [PooledClient::discard] before dropping if the
+/// connection should **not** be reused (e.g. after a transport error).
+pub struct PooledClient {
+    client: BallistaClient,
+    /// Invoked in `Drop::drop` to push the client back into the idle deque.
+    /// `None` after `discard()` is called.
+    return_fn: Option<Box<dyn FnOnce(BallistaClient) + Send>>,
+}
+
+impl PooledClient {
+    pub(crate) fn new(
+        client: BallistaClient,
+        return_fn: Box<dyn FnOnce(BallistaClient) + Send>,
+    ) -> Self {
+        Self {
+            client,
+            return_fn: Some(return_fn),
+        }
+    }
+
+    /// Close the connection instead of returning it to the pool.
+    pub fn discard(mut self) {
+        self.return_fn = None;
+    }
+}
+
+impl Deref for PooledClient {
+    type Target = BallistaClient;
+    fn deref(&self) -> &Self::Target {
+        &self.client
+    }
+}
+
+impl DerefMut for PooledClient {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.client
+    }
+}
+
+impl Drop for PooledClient {
+    fn drop(&mut self) {
+        if let Some(f) = self.return_fn.take() {
+            let client = self.client.clone();
+            f(client);
+        }
+    }
+}
+
+// ---------------------------------------------------------------------------
+// Trait
+// ---------------------------------------------------------------------------
+
+/// Manages a pool of reusable [BallistaClient] connections.
+#[async_trait]
+pub trait BallistaClientPool: Send + Sync + Debug {
+    /// Acquire an idle client for `(host, port)`, or create a new one if the
+    /// pool is empty for that key. The returned [PooledClient] returns itself
+    /// to the pool on drop.
+    async fn acquire(
+        &self,
+        host: &str,
+        port: u16,
+        config: &GrpcClientConfig,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    ) -> Result<PooledClient>;
+
+    /// Remove all idle clients that have been sitting unused longer than the
+    /// configured idle timeout. Called automatically by the background task in
+    /// [DefaultBallistaClientPool]; can also be invoked on demand.
+    async fn evict_idle(&self);
+}
+
+// ---------------------------------------------------------------------------
+// DefaultBallistaClientPool
+// ---------------------------------------------------------------------------
+
+struct IdleEntry {
+    client: BallistaClient,
+    idle_since: Instant,
+}
+
+type IdleMap = DashMap<(String, u16), VecDeque<IdleEntry>>;
+
+struct Inner {
+    idle: IdleMap,
+    idle_timeout: Duration,
+}
+
+/// Default pool implementation.
+///
+/// Keeps a `VecDeque<BallistaClient>` per `(host, port)`. Idle clients are
+/// evicted by a background tokio task that runs at `idle_timeout / 3`
+/// intervals (minimum 15 s). The task exits automatically when the pool `Arc`
+/// is dropped.
+
+#[derive(Clone)]
+pub struct DefaultBallistaClientPool {
+    inner: Arc<Inner>,
+}
+
+impl Debug for DefaultBallistaClientPool {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("DefaultBallistaClientPool").finish()
+    }
+}
+
+impl DefaultBallistaClientPool {
+    /// Create a pool that evicts connections idle longer than `idle_timeout`,
+    pub fn with_eviction_thread(idle_timeout: Duration) -> Self {
+        Self::new(idle_timeout, true)
+    }
+
+    /// Create a pool that evicts connections idle longer than `idle_timeout`,
+    /// if `enable_eviction_thread` is enabled
+    pub fn new(idle_timeout: Duration, enable_eviction_thread: bool) -> Self {
+        let inner = Arc::new(Inner {
+            idle: DashMap::new(),
+            idle_timeout,
+        });
+
+        let weak: Weak<Inner> = Arc::downgrade(&inner);
+        // TODO: do we limit minimum interval here?
+        let check_interval = Duration::from_secs((idle_timeout.as_secs() / 
3).max(15));
+
+        if enable_eviction_thread {
+            tokio::spawn(async move {
+                log::debug!(
+                    "client connection pool - eviction thread started ... 
interval: {check_interval:?}"
+                );
+                let mut ticker = tokio::time::interval(check_interval);
+                loop {
+                    ticker.tick().await;
+
+                    match weak.upgrade() {
+                        None => break,
+                        Some(pool) => {
+                            log::trace!("client connection pool - evicting 
connections");
+                            evict(&pool.idle, pool.idle_timeout)
+                        }
+                    }
+                }
+                log::debug!("client connection pool - eviction thread ... 
DONE");
+            });
+        }
+
+        Self { inner }
+    }
+
+    /// Total number of idle connections currently held across all endpoints.
+    pub fn idle_count(&self) -> usize {
+        self.inner.idle.iter().map(|e| e.value().len()).sum()
+    }
+}
+
+fn evict(idle: &IdleMap, timeout: Duration) {
+    let deadline = Instant::now()
+        .checked_sub(timeout)
+        .unwrap_or_else(Instant::now);
+
+    // Drain expired entries from the front of each deque (oldest = front).
+    idle.retain(|_, deque| {
+        while deque.front().is_some_and(|e| e.idle_since <= deadline) {
+            deque.pop_front();
+        }
+        !deque.is_empty()
+    });
+}
+
+#[async_trait]
+impl BallistaClientPool for DefaultBallistaClientPool {
+    async fn acquire(
+        &self,
+        host: &str,
+        port: u16,
+        config: &GrpcClientConfig,
+        customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
+    ) -> Result<PooledClient> {
+        let key = (host.to_string(), port);
+
+        // Pop the most-recently-used idle client. The DashMap shard lock is
+        // held only for the duration of the pop — released before the async
+        // BallistaClient::try_new call below.
+        let maybe_idle = self
+            .inner
+            .idle
+            .get_mut(&key)
+            .and_then(|mut deque| deque.pop_back())
+            .map(|e| e.client);
+
+        let client = match maybe_idle {
+            Some(c) => {

Review Comment:
   Question about cached entries. The pool keys on `(host, port)`, but 
`GrpcClientConfig` carries values like `io_retries_times` and 
`io_retry_wait_time_ms` that became per-session-config in #1577. If those 
settings change between task runs, a recycled idle client will still be using 
whatever was in effect when it was first created. Is that something we should 
worry about in practice, or do those values rarely vary at runtime?



##########
ballista/core/src/client_pool.rs:
##########
@@ -0,0 +1,426 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Connection pool for `BallistaClient`` instances.
+//!
+//! `DefaultBallistaClientPool` maintains a `VecDeque`` of idle clients per
+//! `(host, port)` key backed by a `DashMap`. Callers 
`BallistaClientPool::acquire` a
+//! `PooledClient` guard; when the guard is dropped the underlying client is
+//! returned to the idle deque automatically.
+//!
+//! If the connection errored, call`PooledClient::discard` before dropping so
+//! the pool closes the channel rather than reusing it.
+//!
+//! A background tokio task evicts idle connections that have not been returned

Review Comment:
   I think this sentence may be inverted. The eviction task only sees 
connections that have already been returned to the idle deque. So it removes 
ones that have been sitting idle (returned but not re-acquired) for longer than 
`idle_timeout`. Maybe reword to something like 'evicts connections that have 
sat idle longer than `idle_timeout`'?



##########
ballista/executor/src/config.rs:
##########
@@ -151,6 +151,13 @@ pub struct Config {
         help = "Metric collection policy of this executor instance"
     )]
     pub metric_collection_policy: ExecutorMetricCollectionPolicy,
+    /// Number of second established client connection should be cached (0 
means no cache)
+    #[arg(
+        long,
+        default_value_t = 0,
+        help = "Number of second established client connection should be 
cached (0 means no cache) "

Review Comment:
   Tiny typo: `Number of second` should be `Number of seconds`. The same 
wording also appears in the rustdoc just above on line 154, and in 
`ballista/executor/src/executor_process.rs` around line 141. Worth fixing all 
three since the help string shows up in `--help` output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to