milenkovicm commented on code in PR #1578: URL: https://github.com/apache/datafusion-ballista/pull/1578#discussion_r3152009573
########## ballista/core/src/client_pool.rs: ########## @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Connection pool for `BallistaClient`` instances. +//! +//! `DefaultBallistaClientPool` maintains a `VecDeque`` of idle clients per +//! `(host, port)` key backed by a `DashMap`. Callers `BallistaClientPool::acquire` a +//! `PooledClient` guard; when the guard is dropped the underlying client is +//! returned to the idle deque automatically. +//! +//! If the connection errored, call`PooledClient::discard` before dropping so +//! the pool closes the channel rather than reusing it. +//! +//! A background tokio task evicts idle connections that have not been returned +//! within the configured `idle_timeout`. + +use crate::client::BallistaClient; +use crate::error::Result; +use crate::extension::BallistaConfigGrpcEndpoint; +use crate::utils::GrpcClientConfig; +use async_trait::async_trait; +use dashmap::DashMap; +use std::collections::VecDeque; +use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; + +// --------------------------------------------------------------------------- +// PooledClient guard +// --------------------------------------------------------------------------- + +/// A [BallistaClient] checked out from a pool. +/// +/// Implements [Deref] / [DerefMut] so it can be used exactly like a +/// [BallistaClient]. On drop, the inner client is returned to the pool +/// automatically. Call [PooledClient::discard] before dropping if the +/// connection should **not** be reused (e.g. after a transport error). +pub struct PooledClient { + client: BallistaClient, + /// Invoked in `Drop::drop` to push the client back into the idle deque. + /// `None` after `discard()` is called. + return_fn: Option<Box<dyn FnOnce(BallistaClient) + Send>>, +} + +impl PooledClient { + pub(crate) fn new( + client: BallistaClient, + return_fn: Box<dyn FnOnce(BallistaClient) + Send>, + ) -> Self { + Self { + client, + return_fn: Some(return_fn), + } + } + + /// Close the connection instead of returning it to the pool. + pub fn discard(mut self) { + self.return_fn = None; + } +} + +impl Deref for PooledClient { + type Target = BallistaClient; + fn deref(&self) -> &Self::Target { + &self.client + } +} + +impl DerefMut for PooledClient { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.client + } +} + +impl Drop for PooledClient { + fn drop(&mut self) { + if let Some(f) = self.return_fn.take() { + let client = self.client.clone(); + f(client); + } + } +} + +// --------------------------------------------------------------------------- +// Trait +// --------------------------------------------------------------------------- + +/// Manages a pool of reusable [BallistaClient] connections. +#[async_trait] +pub trait BallistaClientPool: Send + Sync + Debug { + /// Acquire an idle client for `(host, port)`, or create a new one if the + /// pool is empty for that key. The returned [PooledClient] returns itself + /// to the pool on drop. + async fn acquire( + &self, + host: &str, + port: u16, + config: &GrpcClientConfig, + customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>, + ) -> Result<PooledClient>; + + /// Remove all idle clients that have been sitting unused longer than the + /// configured idle timeout. Called automatically by the background task in + /// [DefaultBallistaClientPool]; can also be invoked on demand. + async fn evict_idle(&self); +} + +// --------------------------------------------------------------------------- +// DefaultBallistaClientPool +// --------------------------------------------------------------------------- + +struct IdleEntry { + client: BallistaClient, + idle_since: Instant, +} + +type IdleMap = DashMap<(String, u16), VecDeque<IdleEntry>>; + +struct Inner { + idle: IdleMap, + idle_timeout: Duration, +} + +/// Default pool implementation. +/// +/// Keeps a `VecDeque<BallistaClient>` per `(host, port)`. Idle clients are +/// evicted by a background tokio task that runs at `idle_timeout / 3` +/// intervals (minimum 15 s). The task exits automatically when the pool `Arc` +/// is dropped. + +#[derive(Clone)] +pub struct DefaultBallistaClientPool { + inner: Arc<Inner>, +} + +impl Debug for DefaultBallistaClientPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultBallistaClientPool").finish() + } +} + +impl DefaultBallistaClientPool { + /// Create a pool that evicts connections idle longer than `idle_timeout`, + pub fn with_eviction_thread(idle_timeout: Duration) -> Self { + Self::new(idle_timeout, true) + } + + /// Create a pool that evicts connections idle longer than `idle_timeout`, + /// if `enable_eviction_thread` is enabled + pub fn new(idle_timeout: Duration, enable_eviction_thread: bool) -> Self { + let inner = Arc::new(Inner { + idle: DashMap::new(), + idle_timeout, + }); + + let weak: Weak<Inner> = Arc::downgrade(&inner); + // TODO: do we limit minimum interval here? + let check_interval = Duration::from_secs((idle_timeout.as_secs() / 3).max(15)); + + if enable_eviction_thread { + tokio::spawn(async move { + log::debug!( + "client connection pool - eviction thread started ... interval: {check_interval:?}" + ); + let mut ticker = tokio::time::interval(check_interval); + loop { + ticker.tick().await; + + match weak.upgrade() { + None => break, + Some(pool) => { + log::trace!("client connection pool - evicting connections"); + evict(&pool.idle, pool.idle_timeout) + } + } + } + log::debug!("client connection pool - eviction thread ... DONE"); + }); + } + + Self { inner } + } + + /// Total number of idle connections currently held across all endpoints. + pub fn idle_count(&self) -> usize { + self.inner.idle.iter().map(|e| e.value().len()).sum() + } +} + +fn evict(idle: &IdleMap, timeout: Duration) { + let deadline = Instant::now() + .checked_sub(timeout) + .unwrap_or_else(Instant::now); + + // Drain expired entries from the front of each deque (oldest = front). + idle.retain(|_, deque| { + while deque.front().is_some_and(|e| e.idle_since <= deadline) { + deque.pop_front(); + } + !deque.is_empty() + }); +} + +#[async_trait] +impl BallistaClientPool for DefaultBallistaClientPool { + async fn acquire( + &self, + host: &str, + port: u16, + config: &GrpcClientConfig, + customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>, + ) -> Result<PooledClient> { + let key = (host.to_string(), port); + + // Pop the most-recently-used idle client. The DashMap shard lock is + // held only for the duration of the pop — released before the async + // BallistaClient::try_new call below. + let maybe_idle = self + .inner + .idle + .get_mut(&key) + .and_then(|mut deque| deque.pop_back()) + .map(|e| e.client); + + let client = match maybe_idle { + Some(c) => { Review Comment: yes, this is boundary case, we could probably use all configuration values as key, but at the moment it looks like an overkill. would suggest to keep it as it is and just document behaviour -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
