This is an automated email from the ASF dual-hosted git repository. freeoneplus pushed a commit to tag 0.5.0 in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git
commit aa953e9fe1f30124300a7aa704a56febec0046e2 Author: FreeOnePlus <[email protected]> AuthorDate: Thu Jul 3 21:31:54 2025 +0800 at_eof bug fix --- doris_mcp_server/utils/db.py | 295 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 238 insertions(+), 57 deletions(-) diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py index e04b4df..0ef6834 100644 --- a/doris_mcp_server/utils/db.py +++ b/doris_mcp_server/utils/db.py @@ -70,6 +70,7 @@ class DorisConnection: self.query_count = 0 self.is_healthy = True self.security_manager = security_manager + self.logger = logging.getLogger(__name__) async def execute(self, sql: str, params: tuple | None = None, auth_context=None) -> QueryResult: """Execute SQL query""" @@ -135,31 +136,88 @@ class DorisConnection: raise async def ping(self) -> bool: - """Check connection health status""" + """Check connection health status with enhanced at_eof error detection""" try: - # Check if connection exists and is not closed + # Check 1: Connection exists and is not closed if not self.connection or self.connection.closed: self.is_healthy = False return False - # Check if connection has _reader (aiomysql internal state) - # This prevents the 'NoneType' object has no attribute 'at_eof' error + # Check 2: Comprehensive internal state validation + # This is critical for detecting at_eof issues before they cause errors if not hasattr(self.connection, '_reader') or self.connection._reader is None: + self.logger.debug(f"Connection {self.session_id} has invalid _reader state") + self.is_healthy = False + return False + + # Check 3: Verify transport state + if (hasattr(self.connection._reader, '_transport') and + self.connection._reader._transport is None): + self.logger.debug(f"Connection {self.session_id} has invalid transport state") + self.is_healthy = False + return False + + # Check 4: Additional stream state validation + if (hasattr(self.connection._reader, 'at_eof') and + callable(self.connection._reader.at_eof)): + try: + # If the stream is already at EOF, the connection is broken + if self.connection._reader.at_eof(): + self.logger.debug(f"Connection {self.session_id} reader is at EOF") + self.is_healthy = False + return False + except Exception: + # If we can't even check at_eof, the connection is problematic + self.logger.debug(f"Connection {self.session_id} cannot check at_eof state") + self.is_healthy = False + return False + + # Check 5: Try to ping the connection with timeout + try: + await asyncio.wait_for(self.connection.ping(), timeout=5) + except asyncio.TimeoutError: + self.logger.debug(f"Connection {self.session_id} ping timeout") + self.is_healthy = False + return False + except Exception as ping_error: + # Check for specific error patterns + error_str = str(ping_error).lower() + if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): + self.logger.debug(f"Connection {self.session_id} ping failed with connection state error: {ping_error}") + else: + self.logger.debug(f"Connection {self.session_id} ping failed: {ping_error}") self.is_healthy = False return False - # Additional check for reader's state - if hasattr(self.connection._reader, '_transport') and self.connection._reader._transport is None: + # Check 6: Final validation with a simple query + try: + async with self.connection.cursor() as cursor: + await asyncio.wait_for(cursor.execute("SELECT 1"), timeout=3) + result = await asyncio.wait_for(cursor.fetchone(), timeout=3) + if not result or result[0] != 1: + self.logger.debug(f"Connection {self.session_id} test query returned invalid result") + self.is_healthy = False + return False + except Exception as query_error: + error_str = str(query_error).lower() + if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): + self.logger.debug(f"Connection {self.session_id} test query failed with connection state error: {query_error}") + else: + self.logger.debug(f"Connection {self.session_id} test query failed: {query_error}") self.is_healthy = False return False - # Try to ping the connection - await self.connection.ping() + # If all checks pass, the connection is healthy self.is_healthy = True return True - except (AttributeError, OSError, ConnectionError, Exception) as e: - # Log the specific error for debugging - logging.debug(f"Connection ping failed for session {self.session_id}: {e}") + + except Exception as e: + # Any uncaught exception means the connection is not healthy + error_str = str(e).lower() + if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 'reader', 'transport']): + self.logger.debug(f"Connection {self.session_id} ping failed with connection state error: {e}") + else: + self.logger.debug(f"Connection {self.session_id} ping failed with unexpected error: {e}") self.is_healthy = False return False @@ -188,11 +246,15 @@ class DorisConnectionManager: self.logger = logging.getLogger(__name__) self.security_manager = security_manager - # Health check configuration - self.health_check_interval = config.database.health_check_interval or 60 + # Enhanced health check configuration for long-connection issues + # Reduce health check interval to detect stale connections faster + self.health_check_interval = min(config.database.health_check_interval or 60, 30) # Max 30 seconds self.max_connection_age = config.database.max_connection_age or 3600 self.connection_timeout = config.database.connection_timeout or 30 - + + # Add stale connection detection threshold (much shorter than MySQL's wait_timeout) + self.stale_connection_threshold = 900 # 15 minutes - connections older than this are considered stale + # Start background tasks self._health_check_task = None self._cleanup_task = None @@ -210,8 +272,11 @@ class DorisConnectionManager: if not self.config.database.password: self.logger.warning("Database password is empty, this may cause connection issues") - # Create connection pool with improved stability parameters - # Key change: Set minsize=0 to avoid pre-creation issues that cause at_eof errors + # Create connection pool with aggressive connection recycling to prevent at_eof issues + # Key changes: + # 1. Reduce pool_recycle to 30 minutes (1800 seconds) - much shorter than MySQL's wait_timeout + # 2. Add shorter connect_timeout to fail fast on bad connections + # 3. Enable autocommit to avoid transaction state issues self.pool = await aiomysql.create_pool( host=self.config.database.host, port=self.config.database.port, @@ -220,12 +285,11 @@ class DorisConnectionManager: db=self.config.database.database, charset="utf8", minsize=self.config.database.min_connections, # Always 0 per configuration to avoid at_eof issues - maxsize=self.config.database.max_connections or 20, autocommit=True, - connect_timeout=self.connection_timeout, - # Enhanced stability parameters - pool_recycle=7200, # Recycle connections every 2 hours + connect_timeout=15, # Shorter timeout to fail fast + # Aggressive connection recycling to prevent stale connections + pool_recycle=1800, # Recycle connections every 30 minutes instead of 2 hours echo=False, # Don't echo SQL statements ) @@ -234,13 +298,12 @@ class DorisConnectionManager: raise RuntimeError("Connection pool robust test failed") self.logger.info( - f"Connection pool initialized successfully with on-demand connection creation, " + f"Connection pool initialized successfully with aggressive recycling (30min), " f"min connections: {self.config.database.min_connections}, " - f"max connections: {self.config.database.max_connections or 20}" ) - # Start background monitoring tasks + # Start background monitoring tasks with more frequent health checks self._health_check_task = asyncio.create_task(self._health_check_loop()) self._cleanup_task = asyncio.create_task(self._cleanup_loop()) @@ -312,10 +375,25 @@ class DorisConnectionManager: self.logger.warning(f"Pool returned closed connection (attempt {attempt + 1})") continue - # Perform a simple ping test instead of checking internal state - # Internal state (_reader, _transport) might not be fully initialized yet + # Enhanced connection validation with multiple checks try: - # Test basic connectivity with a simple query + # Check 1: Verify connection object internal state + if not hasattr(raw_connection, '_reader') or raw_connection._reader is None: + self.logger.warning(f"Connection has invalid _reader state (attempt {attempt + 1})") + await raw_connection.ensure_closed() + continue + + # Check 2: Verify transport state + if (hasattr(raw_connection._reader, '_transport') and + raw_connection._reader._transport is None): + self.logger.warning(f"Connection has invalid transport state (attempt {attempt + 1})") + await raw_connection.ensure_closed() + continue + + # Check 3: Perform ping test to verify server-side connectivity + await raw_connection.ping() + + # Check 4: Test with actual query execution async with raw_connection.cursor() as cursor: await cursor.execute("SELECT 1") result = await cursor.fetchone() @@ -328,17 +406,27 @@ class DorisConnectionManager: continue except Exception as e: - # Check if this is an at_eof error specifically + # Enhanced error detection for connection issues error_str = str(e).lower() - if 'at_eof' in error_str or 'nonetype' in error_str: - self.logger.warning(f"Connection has at_eof issue (attempt {attempt + 1}): {e}") + + # Check for various connection-related errors + connection_error_keywords = [ + 'at_eof', 'nonetype', 'connection', 'transport', 'reader', + 'lost connection', 'broken pipe', 'connection reset', + 'timed out', 'connection refused', 'host unreachable' + ] + + is_connection_error = any(keyword in error_str for keyword in connection_error_keywords) + + if is_connection_error: + self.logger.warning(f"Connection validation failed with connection error (attempt {attempt + 1}): {e}") else: - self.logger.warning(f"Connection test failed (attempt {attempt + 1}): {e}") + self.logger.warning(f"Connection validation failed (attempt {attempt + 1}): {e}") try: await raw_connection.ensure_closed() except Exception: - pass + pass # Ignore cleanup errors continue except Exception as e: @@ -346,8 +434,10 @@ class DorisConnectionManager: if attempt == max_retries - 1: raise RuntimeError(f"Failed to create valid connection after {max_retries} attempts: {e}") else: - # Exponential backoff - await asyncio.sleep(0.5 * (2 ** attempt)) + # Exponential backoff with jitter to avoid thundering herd + base_delay = 0.5 * (2 ** attempt) + jitter = base_delay * 0.1 * (0.5 - asyncio.get_running_loop().time() % 1) + await asyncio.sleep(base_delay + jitter) raise RuntimeError("Failed to create valid connection") @@ -505,41 +595,84 @@ class DorisConnectionManager: self.logger.error(f"Health check error: {e}") async def _perform_health_check(self): - """Perform enhanced health check""" + """Perform enhanced health check with aggressive stale connection detection""" try: unhealthy_sessions = [] + stale_sessions = [] + current_time = datetime.utcnow() # Enhanced health check with comprehensive validation for session_id, conn in self.session_connections.items(): - if not await self._comprehensive_connection_health_check(conn): - unhealthy_sessions.append(session_id) - - # Check for stale connections (over 30 minutes old) - current_time = datetime.utcnow() - stale_sessions = [] - for session_id, conn in self.session_connections.items(): - if session_id not in unhealthy_sessions: # Don't double-check - last_used_delta = (current_time - conn.last_used).total_seconds() - if last_used_delta > 1800: # 30 minutes - # Force a comprehensive health check for stale connections - if not await self._comprehensive_connection_health_check(conn): + try: + # Check 1: Basic connection health + if not await self._comprehensive_connection_health_check(conn): + unhealthy_sessions.append(session_id) + self.logger.debug(f"Session {session_id} marked as unhealthy") + continue + + # Check 2: Stale connection detection (much more aggressive) + time_since_last_use = (current_time - conn.last_used).total_seconds() + connection_age = (current_time - conn.created_at).total_seconds() + + # Mark as stale if: + # 1. Last used more than 15 minutes ago, OR + # 2. Connection age exceeds maximum age, OR + # 3. Connection hasn't been used in a while and is old + if (time_since_last_use > self.stale_connection_threshold or + connection_age > self.max_connection_age or + (time_since_last_use > 300 and connection_age > 1800)): # 5 min unused + 30 min old + + # For stale connections, do an extra validation + try: + # Try a more aggressive ping test + async with conn.connection.cursor() as cursor: + await asyncio.wait_for(cursor.execute("SELECT 1"), timeout=3) + await asyncio.wait_for(cursor.fetchone(), timeout=3) + # If we get here, connection is actually healthy despite being stale + self.logger.debug(f"Stale connection {session_id} passed extra validation") + except Exception as stale_test_error: stale_sessions.append(session_id) + self.logger.debug(f"Session {session_id} marked as stale: {stale_test_error}") + continue + + except Exception as check_error: + # If we can't even check the connection, it's definitely problematic + self.logger.warning(f"Health check failed for session {session_id}: {check_error}") + unhealthy_sessions.append(session_id) all_problematic_sessions = list(set(unhealthy_sessions + stale_sessions)) # Clean up problematic connections + cleanup_results = {"success": 0, "failed": 0} for session_id in all_problematic_sessions: - await self._cleanup_session_connection(session_id) - self.metrics.failed_connections += 1 + try: + await self._cleanup_session_connection(session_id) + cleanup_results["success"] += 1 + self.metrics.failed_connections += 1 + except Exception as cleanup_error: + cleanup_results["failed"] += 1 + self.logger.error(f"Failed to cleanup session {session_id}: {cleanup_error}") # Update metrics await self._update_connection_metrics() self.metrics.last_health_check = datetime.utcnow() + # Log results if all_problematic_sessions: - self.logger.warning(f"Health check: cleaned up {len(unhealthy_sessions)} unhealthy and {len(stale_sessions)} stale connections") + self.logger.warning( + f"Health check: cleaned up {len(unhealthy_sessions)} unhealthy and " + f"{len(stale_sessions)} stale connections " + f"(success: {cleanup_results['success']}, failed: {cleanup_results['failed']})" + ) else: self.logger.debug(f"Health check: all {len(self.session_connections)} connections healthy") + + # If we have a lot of connection failures, log some diagnostic info + if self.metrics.connection_errors > 50: # Threshold for diagnostic logging + self.logger.warning( + f"High connection error count detected: {self.metrics.connection_errors}. " + f"This may indicate persistent connectivity issues with the database." + ) except Exception as e: self.logger.error(f"Health check failed: {e}") @@ -551,10 +684,11 @@ class DorisConnectionManager: pass # Don't let diagnosis failure crash health check async def _cleanup_loop(self): - """Background cleanup loop""" + """Background cleanup loop with more frequent execution""" while True: try: - await asyncio.sleep(300) # Run every 5 minutes + # Run cleanup more frequently - every 2 minutes instead of 5 + await asyncio.sleep(120) # Run every 2 minutes await self._cleanup_idle_connections() except asyncio.CancelledError: break @@ -562,22 +696,69 @@ class DorisConnectionManager: self.logger.error(f"Cleanup loop error: {e}") async def _cleanup_idle_connections(self): - """Clean up idle connections""" + """Clean up idle connections with more aggressive criteria""" current_time = datetime.utcnow() idle_sessions = [] for session_id, conn in self.session_connections.items(): - # Check if connection has exceeded maximum age - age = (current_time - conn.created_at).total_seconds() - if age > self.max_connection_age: + try: + # Enhanced idle connection detection + connection_age = (current_time - conn.created_at).total_seconds() + time_since_last_use = (current_time - conn.last_used).total_seconds() + + # Mark as idle if: + # 1. Connection has exceeded maximum age, OR + # 2. Connection hasn't been used for more than 20 minutes, OR + # 3. Connection is old and hasn't been used recently + should_cleanup = ( + connection_age > self.max_connection_age or + time_since_last_use > 1200 or # 20 minutes unused + (connection_age > 1800 and time_since_last_use > 600) # 30 min old + 10 min unused + ) + + if should_cleanup: + # Before marking for cleanup, try a quick health check + try: + # Quick validation - if this fails, definitely cleanup + if not conn.connection or conn.connection.closed: + idle_sessions.append(session_id) + continue + + # Quick ping test with timeout + await asyncio.wait_for(conn.connection.ping(), timeout=2) + + # If ping succeeds but connection is still very old, cleanup anyway + if connection_age > self.max_connection_age: + idle_sessions.append(session_id) + self.logger.debug(f"Cleaning up old but healthy connection for session {session_id}") + else: + self.logger.debug(f"Keeping healthy connection for session {session_id}") + + except Exception as health_error: + # Health check failed, definitely cleanup + idle_sessions.append(session_id) + self.logger.debug(f"Cleanup marking session {session_id} due to health check failure: {health_error}") + + except Exception as e: + self.logger.warning(f"Error checking connection {session_id} for cleanup: {e}") + # If we can't even check it, it's probably broken idle_sessions.append(session_id) # Clean up idle connections + cleanup_results = {"success": 0, "failed": 0} for session_id in idle_sessions: - await self._cleanup_session_connection(session_id) + try: + await self._cleanup_session_connection(session_id) + cleanup_results["success"] += 1 + except Exception as cleanup_error: + cleanup_results["failed"] += 1 + self.logger.error(f"Failed to cleanup idle session {session_id}: {cleanup_error}") if idle_sessions: - self.logger.info(f"Cleaned up {len(idle_sessions)} idle connections") + self.logger.info( + f"Cleaned up {len(idle_sessions)} idle connections " + f"(success: {cleanup_results['success']}, failed: {cleanup_results['failed']})" + ) async def _update_connection_metrics(self): """Update connection metrics""" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
