This is an automated email from the ASF dual-hosted git repository. freeoneplus pushed a commit to tag 0.5.0 in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git
commit 357bda502c30f672d1dec1118591d2a8aeddda46 Author: FreeOnePlus <[email protected]> AuthorDate: Tue Jul 8 18:08:40 2025 +0800 at_eof bug fix --- doris_mcp_server/utils/db.py | 1122 +++++++++++----------------- doris_mcp_server/utils/schema_extractor.py | 50 +- 2 files changed, 465 insertions(+), 707 deletions(-) diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py index 0ef6834..08ed7db 100644 --- a/doris_mcp_server/utils/db.py +++ b/doris_mcp_server/utils/db.py @@ -29,6 +29,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, List +import random import aiomysql from aiomysql import Connection, Pool @@ -143,81 +144,39 @@ class DorisConnection: self.is_healthy = False return False - # Check 2: Comprehensive internal state validation - # This is critical for detecting at_eof issues before they cause errors - if not hasattr(self.connection, '_reader') or self.connection._reader is None: - self.logger.debug(f"Connection {self.session_id} has invalid _reader state") - self.is_healthy = False - return False - - # Check 3: Verify transport state - if (hasattr(self.connection._reader, '_transport') and - self.connection._reader._transport is None): - self.logger.debug(f"Connection {self.session_id} has invalid transport state") - self.is_healthy = False - return False - - # Check 4: Additional stream state validation - if (hasattr(self.connection._reader, 'at_eof') and - callable(self.connection._reader.at_eof)): - try: - # If the stream is already at EOF, the connection is broken - if self.connection._reader.at_eof(): - self.logger.debug(f"Connection {self.session_id} reader is at EOF") - self.is_healthy = False - return False - except Exception: - # If we can't even check at_eof, the connection is problematic - self.logger.debug(f"Connection {self.session_id} cannot check at_eof state") - self.is_healthy = False - return False - - # Check 5: Try to ping the connection with timeout + # Check 2: Use ONLY safe operations - avoid internal state access + # Instead of checking _reader state directly, use a simple query test try: - await asyncio.wait_for(self.connection.ping(), timeout=5) + # Use a simple query with timeout instead of ping() to avoid at_eof issues + async with asyncio.timeout(3): # 3 second timeout + async with self.connection.cursor() as cursor: + await cursor.execute("SELECT 1") + result = await cursor.fetchone() + if result and result[0] == 1: + self.is_healthy = True + return True + else: + self.logger.debug(f"Connection {self.session_id} ping query returned unexpected result") + self.is_healthy = False + return False + except asyncio.TimeoutError: - self.logger.debug(f"Connection {self.session_id} ping timeout") - self.is_healthy = False - return False - except Exception as ping_error: - # Check for specific error patterns - error_str = str(ping_error).lower() - if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): - self.logger.debug(f"Connection {self.session_id} ping failed with connection state error: {ping_error}") - else: - self.logger.debug(f"Connection {self.session_id} ping failed: {ping_error}") + self.logger.debug(f"Connection {self.session_id} ping timed out") self.is_healthy = False return False - - # Check 6: Final validation with a simple query - try: - async with self.connection.cursor() as cursor: - await asyncio.wait_for(cursor.execute("SELECT 1"), timeout=3) - result = await asyncio.wait_for(cursor.fetchone(), timeout=3) - if not result or result[0] != 1: - self.logger.debug(f"Connection {self.session_id} test query returned invalid result") - self.is_healthy = False - return False except Exception as query_error: + # Check for specific at_eof related errors error_str = str(query_error).lower() - if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): - self.logger.debug(f"Connection {self.session_id} test query failed with connection state error: {query_error}") + if 'at_eof' in error_str or 'nonetype' in error_str: + self.logger.debug(f"Connection {self.session_id} ping failed with at_eof error: {query_error}") else: - self.logger.debug(f"Connection {self.session_id} test query failed: {query_error}") + self.logger.debug(f"Connection {self.session_id} ping failed: {query_error}") self.is_healthy = False return False - - # If all checks pass, the connection is healthy - self.is_healthy = True - return True - + except Exception as e: - # Any uncaught exception means the connection is not healthy - error_str = str(e).lower() - if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): - self.logger.debug(f"Connection {self.session_id} ping failed with connection state error: {e}") - else: - self.logger.debug(f"Connection {self.session_id} ping failed with unexpected error: {e}") + # Catch any other unexpected errors + self.logger.debug(f"Connection {self.session_id} ping failed with unexpected error: {e}") self.is_healthy = False return False @@ -231,605 +190,410 @@ class DorisConnection: class DorisConnectionManager: - """Doris database connection manager + """Doris database connection manager - Simplified Strategy - Provides connection pool management, connection health monitoring, fault recovery and other functions - Supports session-level connection reuse and intelligent load balancing - Integrates security manager to provide unified security validation and data masking + Uses direct connection pool management without session-level caching + Implements connection pool health monitoring and proactive cleanup """ def __init__(self, config, security_manager=None): self.config = config + self.security_manager = security_manager self.pool: Pool | None = None - self.session_connections: dict[str, DorisConnection] = {} - self.metrics = ConnectionMetrics() self.logger = logging.getLogger(__name__) - self.security_manager = security_manager - - # Enhanced health check configuration for long-connection issues - # Reduce health check interval to detect stale connections faster - self.health_check_interval = min(config.database.health_check_interval or 60, 30) # Max 30 seconds - self.max_connection_age = config.database.max_connection_age or 3600 - self.connection_timeout = config.database.connection_timeout or 30 + self.metrics = ConnectionMetrics() + + # Remove session-level connection management + # self.session_connections = {} # REMOVED + + # Pool health monitoring + self.health_check_interval = 30 # seconds + self.pool_warmup_size = 3 # connections to maintain + self.pool_health_check_task = None + self.pool_cleanup_task = None + + # Pool recovery lock to prevent race conditions + self.pool_recovery_lock = asyncio.Lock() + self.pool_recovering = False - # Add stale connection detection threshold (much shorter than MySQL's wait_timeout) - self.stale_connection_threshold = 900 # 15 minutes - connections older than this are considered stale + # Database connection parameters from config.database + self.host = config.database.host + self.port = config.database.port + self.user = config.database.user + self.password = config.database.password + self.database = config.database.database + # Convert charset to aiomysql compatible format + charset_map = {"UTF8": "utf8", "UTF8MB4": "utf8mb4"} + self.charset = charset_map.get(config.database.charset.upper(), config.database.charset.lower()) + self.connect_timeout = config.database.connection_timeout - # Start background tasks - self._health_check_task = None - self._cleanup_task = None + # Connection pool parameters - more conservative settings + self.minsize = config.database.min_connections # This is always 0 + self.maxsize = config.database.max_connections or 10 + self.pool_recycle = config.database.max_connection_age or 3600 # 1 hour, more conservative async def initialize(self): - """Initialize connection manager""" + """Initialize connection pool with health monitoring""" try: - self.logger.info(f"Initializing connection pool to {self.config.database.host}:{self.config.database.port}") - - # Validate configuration - if not self.config.database.host: - raise ValueError("Database host is required") - if not self.config.database.user: - raise ValueError("Database user is required") - if not self.config.database.password: - self.logger.warning("Database password is empty, this may cause connection issues") - - # Create connection pool with aggressive connection recycling to prevent at_eof issues - # Key changes: - # 1. Reduce pool_recycle to 30 minutes (1800 seconds) - much shorter than MySQL's wait_timeout - # 2. Add shorter connect_timeout to fail fast on bad connections - # 3. Enable autocommit to avoid transaction state issues + self.logger.info(f"Initializing connection pool to {self.host}:{self.port}") + + # Create connection pool self.pool = await aiomysql.create_pool( - host=self.config.database.host, - port=self.config.database.port, - user=self.config.database.user, - password=self.config.database.password, - db=self.config.database.database, - charset="utf8", - minsize=self.config.database.min_connections, # Always 0 per configuration to avoid at_eof issues - maxsize=self.config.database.max_connections or 20, - autocommit=True, - connect_timeout=15, # Shorter timeout to fail fast - # Aggressive connection recycling to prevent stale connections - pool_recycle=1800, # Recycle connections every 30 minutes instead of 2 hours - echo=False, # Don't echo SQL statements - ) - - # Test the connection pool with a more robust test - if not await self._robust_connection_test(): - raise RuntimeError("Connection pool robust test failed") - - self.logger.info( - f"Connection pool initialized successfully with aggressive recycling (30min), " - f"min connections: {self.config.database.min_connections}, " - f"max connections: {self.config.database.max_connections or 20}" + host=self.host, + port=self.port, + user=self.user, + password=self.password, + db=self.database, + charset=self.charset, + minsize=self.minsize, + maxsize=self.maxsize, + pool_recycle=self.pool_recycle, + connect_timeout=self.connect_timeout, + autocommit=True ) + + # Test initial connection + if not await self._test_pool_health(): + raise RuntimeError("Connection pool health check failed") + + # Start background monitoring tasks + self.pool_health_check_task = asyncio.create_task(self._pool_health_monitor()) + self.pool_cleanup_task = asyncio.create_task(self._pool_cleanup_monitor()) + + # Perform initial pool warmup + await self._warmup_pool() + + self.logger.info(f"Connection pool initialized successfully, min connections: {self.minsize}, max connections: {self.maxsize}") + + except Exception as e: + self.logger.error(f"Failed to initialize connection pool: {e}") + raise - # Start background monitoring tasks with more frequent health checks - self._health_check_task = asyncio.create_task(self._health_check_loop()) - self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + async def _test_pool_health(self) -> bool: + """Test connection pool health""" + try: + async with self.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT 1") + result = await cursor.fetchone() + return result and result[0] == 1 + except Exception as e: + self.logger.error(f"Pool health test failed: {e}") + return False + async def _warmup_pool(self): + """Warm up connection pool by creating initial connections""" + self.logger.info(f"๐ฅ Warming up connection pool with {self.pool_warmup_size} connections") + + warmup_connections = [] + try: + # Acquire connections to force pool to create them + for i in range(self.pool_warmup_size): + try: + conn = await self.pool.acquire() + warmup_connections.append(conn) + self.logger.debug(f"Warmed up connection {i+1}/{self.pool_warmup_size}") + except Exception as e: + self.logger.warning(f"Failed to warm up connection {i+1}: {e}") + break + + # Release all warmup connections back to pool + for conn in warmup_connections: + try: + self.pool.release(conn) + except Exception as e: + self.logger.warning(f"Failed to release warmup connection: {e}") + + self.logger.info(f"โ Pool warmup completed, {len(warmup_connections)} connections created") + except Exception as e: - self.logger.error(f"Connection pool initialization failed: {e}") - # Clean up partial initialization - if self.pool: + self.logger.error(f"Pool warmup failed: {e}") + # Clean up any remaining connections + for conn in warmup_connections: try: - self.pool.close() - await self.pool.wait_closed() + await conn.ensure_closed() except Exception: pass - self.pool = None - raise - async def _robust_connection_test(self) -> bool: - """Perform a robust connection test that validates full connection health""" - max_retries = 3 - for attempt in range(max_retries): + async def _pool_health_monitor(self): + """Background task to monitor pool health""" + self.logger.info("๐ฉบ Starting pool health monitor") + + while True: try: - self.logger.debug(f"Testing connection pool (attempt {attempt + 1}/{max_retries})") - - # Test connection creation and validation - test_conn = await self._create_raw_connection_with_validation() - if test_conn: - # Test basic query execution - async with test_conn.cursor() as cursor: - await cursor.execute("SELECT 1") - result = await cursor.fetchone() - if result and result[0] == 1: - self.logger.debug("Connection pool test successful") - # Return connection to pool - if self.pool: - self.pool.release(test_conn) - return True - else: - self.logger.warning("Connection test query returned unexpected result") - - # Close test connection if we get here - await test_conn.ensure_closed() - + await asyncio.sleep(self.health_check_interval) + await self._check_pool_health() + except asyncio.CancelledError: + self.logger.info("Pool health monitor stopped") + break except Exception as e: - self.logger.warning(f"Connection test attempt {attempt + 1} failed: {e}") - if attempt == max_retries - 1: - self.logger.error("All connection test attempts failed") - return False - else: - # Wait before retry - await asyncio.sleep(1.0 * (attempt + 1)) - - return False + self.logger.error(f"Pool health monitor error: {e}") - async def _create_raw_connection_with_validation(self, max_retries: int = 3): - """Create a raw connection with comprehensive validation""" - for attempt in range(max_retries): + async def _pool_cleanup_monitor(self): + """Background task to clean up stale connections""" + self.logger.info("๐งน Starting pool cleanup monitor") + + while True: try: - if not self.pool: - raise RuntimeError("Connection pool not initialized") - - # Acquire connection from pool - raw_connection = await self.pool.acquire() - - # Basic connection validation - if not raw_connection: - self.logger.warning(f"Pool returned None connection (attempt {attempt + 1})") - continue - - if raw_connection.closed: - self.logger.warning(f"Pool returned closed connection (attempt {attempt + 1})") - continue - - # Enhanced connection validation with multiple checks - try: - # Check 1: Verify connection object internal state - if not hasattr(raw_connection, '_reader') or raw_connection._reader is None: - self.logger.warning(f"Connection has invalid _reader state (attempt {attempt + 1})") - await raw_connection.ensure_closed() - continue - - # Check 2: Verify transport state - if (hasattr(raw_connection._reader, '_transport') and - raw_connection._reader._transport is None): - self.logger.warning(f"Connection has invalid transport state (attempt {attempt + 1})") - await raw_connection.ensure_closed() - continue - - # Check 3: Perform ping test to verify server-side connectivity - await raw_connection.ping() - - # Check 4: Test with actual query execution - async with raw_connection.cursor() as cursor: - await cursor.execute("SELECT 1") - result = await cursor.fetchone() - if result and result[0] == 1: - self.logger.debug(f"Successfully created and validated raw connection (attempt {attempt + 1})") - return raw_connection - else: - self.logger.warning(f"Connection test query failed (attempt {attempt + 1})") - await raw_connection.ensure_closed() - continue - - except Exception as e: - # Enhanced error detection for connection issues - error_str = str(e).lower() - - # Check for various connection-related errors - connection_error_keywords = [ - 'at_eof', 'nonetype', 'connection', 'transport', 'reader', - 'lost connection', 'broken pipe', 'connection reset', - 'timed out', 'connection refused', 'host unreachable' - ] - - is_connection_error = any(keyword in error_str for keyword in connection_error_keywords) - - if is_connection_error: - self.logger.warning(f"Connection validation failed with connection error (attempt {attempt + 1}): {e}") - else: - self.logger.warning(f"Connection validation failed (attempt {attempt + 1}): {e}") - - try: - await raw_connection.ensure_closed() - except Exception: - pass # Ignore cleanup errors - continue - + await asyncio.sleep(self.health_check_interval * 2) # Less frequent cleanup + await self._cleanup_stale_connections() + except asyncio.CancelledError: + self.logger.info("Pool cleanup monitor stopped") + break except Exception as e: - self.logger.warning(f"Raw connection creation attempt {attempt + 1} failed: {e}") - if attempt == max_retries - 1: - raise RuntimeError(f"Failed to create valid connection after {max_retries} attempts: {e}") - else: - # Exponential backoff with jitter to avoid thundering herd - base_delay = 0.5 * (2 ** attempt) - jitter = base_delay * 0.1 * (0.5 - asyncio.get_running_loop().time() % 1) - await asyncio.sleep(base_delay + jitter) - - raise RuntimeError("Failed to create valid connection") - - async def get_connection(self, session_id: str) -> DorisConnection: - """Get database connection with enhanced reliability - - Supports session-level connection reuse to improve performance and consistency - """ - # Check if there's an existing session connection - if session_id in self.session_connections: - conn = self.session_connections[session_id] - # Enhanced connection health check - if await self._comprehensive_connection_health_check(conn): - return conn - else: - # Connection is unhealthy, clean up and create new one - self.logger.debug(f"Existing connection unhealthy for session {session_id}, creating new one") - await self._cleanup_session_connection(session_id) + self.logger.error(f"Pool cleanup monitor error: {e}") - # Create new connection with retry logic - return await self._create_new_connection_with_retry(session_id) - - async def _comprehensive_connection_health_check(self, conn: DorisConnection) -> bool: - """Perform comprehensive connection health check""" + async def _check_pool_health(self): + """Check and maintain pool health""" try: - # Check basic connection state - if not conn.connection or conn.connection.closed: - return False - - # Instead of checking internal state, perform a simple ping test - # This is more reliable and less dependent on aiomysql internals - if not await conn.ping(): - return False - - return True + # Skip health check if already recovering + if self.pool_recovering: + self.logger.debug("Pool recovery in progress, skipping health check") + return + + # Test pool with a simple query + health_ok = await self._test_pool_health() - except Exception as e: - # Check for at_eof errors specifically - error_str = str(e).lower() - if 'at_eof' in error_str: - self.logger.debug(f"Connection health check failed with at_eof error: {e}") + if health_ok: + self.logger.debug("โ Pool health check passed") + self.metrics.last_health_check = datetime.utcnow() else: - self.logger.debug(f"Connection health check failed: {e}") - return False - - async def _create_new_connection_with_retry(self, session_id: str, max_retries: int = 3) -> DorisConnection: - """Create new database connection with retry logic""" - for attempt in range(max_retries): - try: - # Get validated raw connection - raw_connection = await self._create_raw_connection_with_validation() + self.logger.warning("โ Pool health check failed, attempting recovery") + await self._recover_pool() - # Create wrapped connection - doris_conn = DorisConnection(raw_connection, session_id, self.security_manager) - - # Comprehensive connection test - if await self._comprehensive_connection_health_check(doris_conn): - # Store in session connections - self.session_connections[session_id] = doris_conn - self.metrics.total_connections += 1 - self.logger.debug(f"Successfully created new connection for session: {session_id}") - return doris_conn - else: - # Connection failed health check, clean up and retry - self.logger.warning(f"New connection failed health check for session {session_id} (attempt {attempt + 1})") - try: - await doris_conn.close() - except Exception: - pass - - except Exception as e: - self.logger.warning(f"Connection creation attempt {attempt + 1} failed for session {session_id}: {e}") - if attempt == max_retries - 1: - self.metrics.connection_errors += 1 - raise RuntimeError(f"Failed to create connection for session {session_id} after {max_retries} attempts: {e}") - else: - # Exponential backoff - await asyncio.sleep(0.5 * (2 ** attempt)) - - raise RuntimeError(f"Unexpected failure in connection creation for session {session_id}") - - async def release_connection(self, session_id: str): - """Release session connection""" - if session_id in self.session_connections: - await self._cleanup_session_connection(session_id) + except Exception as e: + self.logger.error(f"Pool health check error: {e}") + await self._recover_pool() - async def _cleanup_session_connection(self, session_id: str): - """Clean up session connection with enhanced safety""" - if session_id in self.session_connections: - conn = self.session_connections[session_id] - try: - # Simplified connection validation before returning to pool - connection_healthy = False + async def _cleanup_stale_connections(self): + """Proactively clean up potentially stale connections""" + try: + self.logger.debug("๐งน Checking for stale connections") + + # Get pool statistics + pool_size = self.pool.size + pool_free = self.pool.freesize + + # If pool has idle connections, test some of them + if pool_free > 0: + test_count = min(pool_free, 2) # Test up to 2 idle connections - if (self.pool and - conn.connection and - not conn.connection.closed): - - # Test if connection is still healthy with a simple check + for i in range(test_count): try: - # Quick ping test to see if connection is usable - async with conn.connection.cursor() as cursor: - await cursor.execute("SELECT 1") + # Acquire connection, test it, and release + conn = await asyncio.wait_for(self.pool.acquire(), timeout=5) + + # Quick test + async with conn.cursor() as cursor: + await asyncio.wait_for(cursor.execute("SELECT 1"), timeout=3) await cursor.fetchone() - connection_healthy = True - except Exception as test_error: - self.logger.debug(f"Connection health test failed for session {session_id}: {test_error}") - connection_healthy = False - - if connection_healthy: - # Connection appears healthy, return to pool - try: - self.pool.release(conn.connection) - self.logger.debug(f"Successfully returned connection to pool for session {session_id}") - except Exception as pool_error: - self.logger.debug(f"Failed to return connection to pool for session {session_id}: {pool_error}") + + # Connection is healthy, release it + self.pool.release(conn) + + except asyncio.TimeoutError: + self.logger.debug(f"Stale connection test {i+1} timed out") try: - await conn.connection.ensure_closed() + await conn.ensure_closed() + except Exception: + pass + except Exception as e: + self.logger.debug(f"Stale connection test {i+1} failed: {e}") + try: + await conn.ensure_closed() except Exception: pass - else: - # Connection is unhealthy, force close - self.logger.debug(f"Connection unhealthy for session {session_id}, force closing") - try: - if conn.connection and not conn.connection.closed: - await conn.connection.ensure_closed() - except Exception: - pass # Ignore errors during forced close - # Close connection wrapper - await conn.close() + self.logger.debug(f"Stale connection cleanup completed, tested {test_count} connections") - except Exception as e: - self.logger.error(f"Error cleaning up connection for session {session_id}: {e}") - # Force close if normal cleanup fails - try: - if conn.connection and not conn.connection.closed: - await conn.connection.ensure_closed() - except Exception: - pass # Ignore errors during forced close - finally: - # Remove from session connections - del self.session_connections[session_id] - self.logger.debug(f"Cleaned up connection for session: {session_id}") - - async def _health_check_loop(self): - """Background health check loop""" - while True: - try: - await asyncio.sleep(self.health_check_interval) - await self._perform_health_check() - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Health check error: {e}") - - async def _perform_health_check(self): - """Perform enhanced health check with aggressive stale connection detection""" - try: - unhealthy_sessions = [] - stale_sessions = [] - current_time = datetime.utcnow() - - # Enhanced health check with comprehensive validation - for session_id, conn in self.session_connections.items(): - try: - # Check 1: Basic connection health - if not await self._comprehensive_connection_health_check(conn): - unhealthy_sessions.append(session_id) - self.logger.debug(f"Session {session_id} marked as unhealthy") - continue - - # Check 2: Stale connection detection (much more aggressive) - time_since_last_use = (current_time - conn.last_used).total_seconds() - connection_age = (current_time - conn.created_at).total_seconds() - - # Mark as stale if: - # 1. Last used more than 15 minutes ago, OR - # 2. Connection age exceeds maximum age, OR - # 3. Connection hasn't been used in a while and is old - if (time_since_last_use > self.stale_connection_threshold or - connection_age > self.max_connection_age or - (time_since_last_use > 300 and connection_age > 1800)): # 5 min unused + 30 min old - - # For stale connections, do an extra validation - try: - # Try a more aggressive ping test - async with conn.connection.cursor() as cursor: - await asyncio.wait_for(cursor.execute("SELECT 1"), timeout=3) - await asyncio.wait_for(cursor.fetchone(), timeout=3) - # If we get here, connection is actually healthy despite being stale - self.logger.debug(f"Stale connection {session_id} passed extra validation") - except Exception as stale_test_error: - stale_sessions.append(session_id) - self.logger.debug(f"Session {session_id} marked as stale: {stale_test_error}") - continue - - except Exception as check_error: - # If we can't even check the connection, it's definitely problematic - self.logger.warning(f"Health check failed for session {session_id}: {check_error}") - unhealthy_sessions.append(session_id) - - all_problematic_sessions = list(set(unhealthy_sessions + stale_sessions)) - - # Clean up problematic connections - cleanup_results = {"success": 0, "failed": 0} - for session_id in all_problematic_sessions: - try: - await self._cleanup_session_connection(session_id) - cleanup_results["success"] += 1 - self.metrics.failed_connections += 1 - except Exception as cleanup_error: - cleanup_results["failed"] += 1 - self.logger.error(f"Failed to cleanup session {session_id}: {cleanup_error}") - - # Update metrics - await self._update_connection_metrics() - self.metrics.last_health_check = datetime.utcnow() - - # Log results - if all_problematic_sessions: - self.logger.warning( - f"Health check: cleaned up {len(unhealthy_sessions)} unhealthy and " - f"{len(stale_sessions)} stale connections " - f"(success: {cleanup_results['success']}, failed: {cleanup_results['failed']})" - ) - else: - self.logger.debug(f"Health check: all {len(self.session_connections)} connections healthy") - - # If we have a lot of connection failures, log some diagnostic info - if self.metrics.connection_errors > 50: # Threshold for diagnostic logging - self.logger.warning( - f"High connection error count detected: {self.metrics.connection_errors}. " - f"This may indicate persistent connectivity issues with the database." - ) - except Exception as e: - self.logger.error(f"Health check failed: {e}") - # If health check fails, try to diagnose the issue - try: - diagnosis = await self.diagnose_connection_health() - self.logger.error(f"Connection diagnosis: {diagnosis}") - except Exception: - pass # Don't let diagnosis failure crash health check - - async def _cleanup_loop(self): - """Background cleanup loop with more frequent execution""" - while True: - try: - # Run cleanup more frequently - every 2 minutes instead of 5 - await asyncio.sleep(120) # Run every 2 minutes - await self._cleanup_idle_connections() - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Cleanup loop error: {e}") - - async def _cleanup_idle_connections(self): - """Clean up idle connections with more aggressive criteria""" - current_time = datetime.utcnow() - idle_sessions = [] - - for session_id, conn in self.session_connections.items(): - try: - # Enhanced idle connection detection - connection_age = (current_time - conn.created_at).total_seconds() - time_since_last_use = (current_time - conn.last_used).total_seconds() + self.logger.error(f"Stale connection cleanup error: {e}") + + async def _recover_pool(self): + """Recover connection pool when health check fails""" + # Use lock to prevent concurrent recovery attempts + async with self.pool_recovery_lock: + # Check if another recovery is already in progress + if self.pool_recovering: + self.logger.debug("Pool recovery already in progress, waiting...") + return - # Mark as idle if: - # 1. Connection has exceeded maximum age, OR - # 2. Connection hasn't been used for more than 20 minutes, OR - # 3. Connection is old and hasn't been used recently - should_cleanup = ( - connection_age > self.max_connection_age or - time_since_last_use > 1200 or # 20 minutes unused - (connection_age > 1800 and time_since_last_use > 600) # 30 min old + 10 min unused - ) + try: + self.pool_recovering = True + max_retries = 3 + retry_delay = 5 # seconds - if should_cleanup: - # Before marking for cleanup, try a quick health check + for attempt in range(max_retries): try: - # Quick validation - if this fails, definitely cleanup - if not conn.connection or conn.connection.closed: - idle_sessions.append(session_id) - continue - - # Quick ping test with timeout - await asyncio.wait_for(conn.connection.ping(), timeout=2) + self.logger.info(f"๐ Attempting pool recovery (attempt {attempt + 1}/{max_retries})") + + # Try to close existing pool with timeout + if self.pool: + try: + if not self.pool.closed: + self.pool.close() + await asyncio.wait_for(self.pool.wait_closed(), timeout=3.0) + self.logger.debug("Old pool closed successfully") + except asyncio.TimeoutError: + self.logger.warning("Pool close timeout, forcing cleanup") + except Exception as e: + self.logger.warning(f"Error closing old pool: {e}") + finally: + self.pool = None - # If ping succeeds but connection is still very old, cleanup anyway - if connection_age > self.max_connection_age: - idle_sessions.append(session_id) - self.logger.debug(f"Cleaning up old but healthy connection for session {session_id}") + # Wait before creating new pool (reduced delay) + if attempt > 0: + await asyncio.sleep(2) # Reduced from 5 to 2 seconds + + # Recreate pool with timeout + self.logger.debug("Creating new connection pool...") + self.pool = await asyncio.wait_for( + aiomysql.create_pool( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + db=self.database, + charset=self.charset, + minsize=self.minsize, + maxsize=self.maxsize, + pool_recycle=self.pool_recycle, + connect_timeout=self.connect_timeout, + autocommit=True + ), + timeout=10.0 + ) + + # Test recovered pool with timeout + if await asyncio.wait_for(self._test_pool_health(), timeout=5.0): + self.logger.info(f"โ Pool recovery successful on attempt {attempt + 1}") + # Re-warm the pool with timeout + try: + await asyncio.wait_for(self._warmup_pool(), timeout=5.0) + except asyncio.TimeoutError: + self.logger.warning("Pool warmup timeout, but recovery successful") + return else: - self.logger.debug(f"Keeping healthy connection for session {session_id}") + self.logger.warning(f"โ Pool recovery health check failed on attempt {attempt + 1}") - except Exception as health_error: - # Health check failed, definitely cleanup - idle_sessions.append(session_id) - self.logger.debug(f"Cleanup marking session {session_id} due to health check failure: {health_error}") + except asyncio.TimeoutError: + self.logger.error(f"Pool recovery attempt {attempt + 1} timed out") + if self.pool: + try: + self.pool.close() + except: + pass + self.pool = None + except Exception as e: + self.logger.error(f"Pool recovery error on attempt {attempt + 1}: {e}") - except Exception as e: - self.logger.warning(f"Error checking connection {session_id} for cleanup: {e}") - # If we can't even check it, it's probably broken - idle_sessions.append(session_id) - - # Clean up idle connections - cleanup_results = {"success": 0, "failed": 0} - for session_id in idle_sessions: - try: - await self._cleanup_session_connection(session_id) - cleanup_results["success"] += 1 - except Exception as cleanup_error: - cleanup_results["failed"] += 1 - self.logger.error(f"Failed to cleanup idle session {session_id}: {cleanup_error}") - - if idle_sessions: - self.logger.info( - f"Cleaned up {len(idle_sessions)} idle connections " - f"(success: {cleanup_results['success']}, failed: {cleanup_results['failed']})" - ) - - async def _update_connection_metrics(self): - """Update connection metrics""" - self.metrics.active_connections = len(self.session_connections) - if self.pool: - self.metrics.idle_connections = self.pool.freesize - - async def get_metrics(self) -> ConnectionMetrics: - """Get connection metrics""" - await self._update_connection_metrics() - return self.metrics + # Clean up failed pool + if self.pool: + try: + self.pool.close() + await asyncio.wait_for(self.pool.wait_closed(), timeout=2.0) + except Exception: + pass + finally: + self.pool = None + + # All recovery attempts failed + self.logger.error("โ Pool recovery failed after all attempts") + self.pool = None + + finally: + self.pool_recovering = False - async def execute_query( - self, session_id: str, sql: str, params: tuple | None = None, auth_context=None - ) -> QueryResult: - """Execute query with enhanced error handling and retry logic""" - max_retries = 2 - for attempt in range(max_retries): - try: - conn = await self.get_connection(session_id) - return await conn.execute(sql, params, auth_context) - except Exception as e: - error_msg = str(e).lower() - # Check for connection-related errors that warrant retry - is_connection_error = any(keyword in error_msg for keyword in [ - 'at_eof', 'connection', 'closed', 'nonetype', 'reader', 'transport' - ]) + async def get_connection(self, session_id: str) -> DorisConnection: + """Get database connection - Simplified Strategy with pool validation + + Always acquire fresh connection from pool, no session caching + """ + try: + # Wait for any ongoing recovery to complete + if self.pool_recovering: + self.logger.debug(f"Pool recovery in progress, waiting for completion...") + # Wait for recovery to complete (max 10 seconds) + for _ in range(10): + if not self.pool_recovering: + break + await asyncio.sleep(0.5) - if is_connection_error and attempt < max_retries - 1: - self.logger.warning(f"Connection error during query execution (attempt {attempt + 1}): {e}") - # Clean up the problematic connection - await self.release_connection(session_id) - # Wait before retry - await asyncio.sleep(0.5 * (attempt + 1)) - continue - else: - # Not a connection error or final retry - re-raise - raise + if self.pool_recovering: + self.logger.error("Pool recovery is taking too long, proceeding anyway") + # Don't raise error, try to continue + + # Check if pool is available + if not self.pool: + self.logger.warning("Connection pool is not available, attempting recovery...") + await self._recover_pool() + + if not self.pool: + raise RuntimeError("Connection pool is not available and recovery failed") + + # Check if pool is closed + if self.pool.closed: + self.logger.warning("Connection pool is closed, attempting recovery...") + await self._recover_pool() + + if not self.pool or self.pool.closed: + raise RuntimeError("Connection pool is closed and recovery failed") + + # Simple strategy: always get fresh connection from pool + raw_conn = await self.pool.acquire() + + # Wrap in DorisConnection + doris_conn = DorisConnection(raw_conn, session_id, self.security_manager) + + # Simple validation - just check if connection is open + if raw_conn.closed: + raise RuntimeError("Acquired connection is already closed") + + self.logger.debug(f"โ Acquired fresh connection for session {session_id}") + return doris_conn + + except Exception as e: + self.logger.error(f"Failed to get connection for session {session_id}: {e}") + raise - @asynccontextmanager - async def get_connection_context(self, session_id: str): - """Get connection context manager""" - conn = await self.get_connection(session_id) + async def release_connection(self, session_id: str, connection: DorisConnection): + """Release connection back to pool - Simplified Strategy""" try: - yield conn - finally: - # Connection will be reused, no need to close here - pass + if connection and connection.connection: + # Simple strategy: always return to pool + if not connection.connection.closed: + self.pool.release(connection.connection) + self.logger.debug(f"โ Released connection for session {session_id}") + else: + self.logger.debug(f"Connection already closed for session {session_id}") + + except Exception as e: + self.logger.error(f"Error releasing connection for session {session_id}: {e}") + # Force close if release fails + try: + if connection and connection.connection: + await connection.connection.ensure_closed() + except Exception: + pass async def close(self): """Close connection manager""" try: # Cancel background tasks - if self._health_check_task: - self._health_check_task.cancel() + if self.pool_health_check_task: + self.pool_health_check_task.cancel() try: - await self._health_check_task + await self.pool_health_check_task except asyncio.CancelledError: pass - if self._cleanup_task: - self._cleanup_task.cancel() + if self.pool_cleanup_task: + self.pool_cleanup_task.cancel() try: - await self._cleanup_task + await self.pool_cleanup_task except asyncio.CancelledError: pass - # Clean up all session connections - for session_id in list(self.session_connections.keys()): - await self._cleanup_session_connection(session_id) - # Close connection pool if self.pool: self.pool.close() @@ -842,15 +606,62 @@ class DorisConnectionManager: async def test_connection(self) -> bool: """Test database connection using robust connection test""" - return await self._robust_connection_test() + return await self._test_pool_health() + + async def get_metrics(self) -> ConnectionMetrics: + """Get connection pool metrics - Simplified Strategy""" + try: + if self.pool: + self.metrics.idle_connections = self.pool.freesize + self.metrics.active_connections = self.pool.size - self.pool.freesize + else: + self.metrics.idle_connections = 0 + self.metrics.active_connections = 0 + + return self.metrics + except Exception as e: + self.logger.error(f"Error getting metrics: {e}") + return self.metrics + + async def execute_query( + self, session_id: str, sql: str, params: tuple | None = None, auth_context=None + ) -> QueryResult: + """Execute query - Simplified Strategy with automatic connection management""" + connection = None + try: + # Always get fresh connection from pool + connection = await self.get_connection(session_id) + + # Execute query + result = await connection.execute(sql, params, auth_context) + + return result + + except Exception as e: + self.logger.error(f"Query execution failed for session {session_id}: {e}") + raise + finally: + # Always release connection back to pool + if connection: + await self.release_connection(session_id, connection) + + @asynccontextmanager + async def get_connection_context(self, session_id: str): + """Get connection context manager - Simplified Strategy""" + connection = None + try: + connection = await self.get_connection(session_id) + yield connection + finally: + if connection: + await self.release_connection(session_id, connection) async def diagnose_connection_health(self) -> Dict[str, Any]: - """Diagnose connection pool and session health""" + """Diagnose connection pool health - Simplified Strategy""" diagnosis = { "timestamp": datetime.utcnow().isoformat(), "pool_status": "unknown", - "session_connections": {}, - "problematic_connections": [], + "pool_info": {}, "recommendations": [] } @@ -874,55 +685,16 @@ class DorisConnectionManager: "max_size": self.pool.maxsize } - # Check session connections - problematic_sessions = [] - for session_id, conn in self.session_connections.items(): - conn_status = { - "session_id": session_id, - "created_at": conn.created_at.isoformat(), - "last_used": conn.last_used.isoformat(), - "query_count": conn.query_count, - "is_healthy": conn.is_healthy - } - - # Detailed connection checks - if conn.connection: - conn_status["connection_closed"] = conn.connection.closed - conn_status["has_reader"] = hasattr(conn.connection, '_reader') and conn.connection._reader is not None - - if hasattr(conn.connection, '_reader') and conn.connection._reader: - conn_status["reader_transport"] = conn.connection._reader._transport is not None - else: - conn_status["reader_transport"] = False - else: - conn_status["connection_closed"] = True - conn_status["has_reader"] = False - conn_status["reader_transport"] = False - - # Check if connection is problematic - if (not conn.is_healthy or - conn_status["connection_closed"] or - not conn_status["has_reader"] or - not conn_status["reader_transport"]): - problematic_sessions.append(session_id) - diagnosis["problematic_connections"].append(conn_status) - - diagnosis["session_connections"][session_id] = conn_status - - # Generate recommendations - if problematic_sessions: - diagnosis["recommendations"].append(f"Clean up {len(problematic_sessions)} problematic connections") - + # Generate recommendations based on pool status if self.pool.freesize == 0 and self.pool.size >= self.pool.maxsize: diagnosis["recommendations"].append("Connection pool exhausted - consider increasing max_connections") - # Auto-cleanup problematic connections - for session_id in problematic_sessions: - try: - await self._cleanup_session_connection(session_id) - self.logger.info(f"Auto-cleaned problematic connection for session: {session_id}") - except Exception as e: - self.logger.error(f"Failed to auto-clean session {session_id}: {e}") + # Test pool health + if await self._test_pool_health(): + diagnosis["pool_health"] = "healthy" + else: + diagnosis["pool_health"] = "unhealthy" + diagnosis["recommendations"].append("Pool health check failed - may need recovery") return diagnosis @@ -949,7 +721,8 @@ class ConnectionPoolMonitor: status = { "pool_size": self.connection_manager.pool.size if self.connection_manager.pool else 0, "free_connections": self.connection_manager.pool.freesize if self.connection_manager.pool else 0, - "active_sessions": len(self.connection_manager.session_connections), + "active_connections": metrics.active_connections, + "idle_connections": metrics.idle_connections, "total_connections": metrics.total_connections, "failed_connections": metrics.failed_connections, "connection_errors": metrics.connection_errors, @@ -960,54 +733,33 @@ class ConnectionPoolMonitor: return status async def get_session_details(self) -> list[dict[str, Any]]: - """Get session connection details""" - sessions = [] - - for session_id, conn in self.connection_manager.session_connections.items(): - session_info = { - "session_id": session_id, - "created_at": conn.created_at.isoformat(), - "last_used": conn.last_used.isoformat(), - "query_count": conn.query_count, - "is_healthy": conn.is_healthy, - "connection_age": (datetime.utcnow() - conn.created_at).total_seconds(), - } - sessions.append(session_info) - - return sessions + """Get session connection details - Simplified Strategy (No session caching)""" + # In simplified strategy, we don't maintain session connections + # Return empty list as connections are managed by the pool directly + return [] async def generate_health_report(self) -> dict[str, Any]: - """Generate connection health report""" + """Generate connection health report - Simplified Strategy""" pool_status = await self.get_pool_status() - session_details = await self.get_session_details() - # Calculate health statistics - healthy_sessions = sum(1 for s in session_details if s["is_healthy"]) - total_sessions = len(session_details) - health_ratio = healthy_sessions / total_sessions if total_sessions > 0 else 1.0 + # Calculate pool utilization + pool_utilization = 1.0 - (pool_status["free_connections"] / pool_status["pool_size"]) if pool_status["pool_size"] > 0 else 0.0 report = { "timestamp": datetime.utcnow().isoformat(), "pool_status": pool_status, - "session_summary": { - "total_sessions": total_sessions, - "healthy_sessions": healthy_sessions, - "health_ratio": health_ratio, - }, - "session_details": session_details, + "pool_utilization": pool_utilization, "recommendations": [], } - # Add recommendations based on health status - if health_ratio < 0.8: - report["recommendations"].append("Consider checking database connectivity and network stability") - + # Add recommendations based on pool status if pool_status["connection_errors"] > 10: report["recommendations"].append("High connection error rate detected, review connection configuration") - if pool_status["active_sessions"] > pool_status["pool_size"] * 0.9: + if pool_utilization > 0.9: report["recommendations"].append("Connection pool utilization is high, consider increasing pool size") - return report - - + if pool_status["free_connections"] == 0: + report["recommendations"].append("No free connections available, consider increasing pool size") + + return report \ No newline at end of file diff --git a/doris_mcp_server/utils/schema_extractor.py b/doris_mcp_server/utils/schema_extractor.py index fd711c8..db973e7 100644 --- a/doris_mcp_server/utils/schema_extractor.py +++ b/doris_mcp_server/utils/schema_extractor.py @@ -1215,33 +1215,39 @@ class MetadataExtractor: try: if self.connection_manager: import asyncio + import concurrent.futures + import threading - # Try to run the async query - try: - # Check if there's a running event loop - loop = asyncio.get_running_loop() - # If we're in an async context, we need to run in a separate thread - import concurrent.futures - - def run_in_new_loop(): - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) + # Always run in a separate thread with new event loop to avoid conflicts + def run_in_new_loop(): + # Create new event loop for this thread + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + return new_loop.run_until_complete( + self._execute_query_async(query, db_name, return_dataframe) + ) + finally: try: - return new_loop.run_until_complete( - self._execute_query_async(query, db_name, return_dataframe) - ) + # Properly close the loop + pending = asyncio.all_tasks(new_loop) + if pending: + new_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) finally: new_loop.close() - - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(run_in_new_loop) + + # Use ThreadPoolExecutor to run in separate thread + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_in_new_loop) + try: return future.result(timeout=30) - - except RuntimeError: - # No running loop, we can safely create one - return asyncio.run( - self._execute_query_async(query, db_name, return_dataframe) - ) + except concurrent.futures.TimeoutError: + logger.error("Query execution timed out after 30 seconds") + if return_dataframe: + import pandas as pd + return pd.DataFrame() + else: + return [] else: # Fallback: Return empty result logger.warning("No connection manager provided, returning empty result") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
