This is an automated email from the ASF dual-hosted git repository.

freeoneplus pushed a commit to tag 0.5.0
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git

commit aa953e9fe1f30124300a7aa704a56febec0046e2
Author: FreeOnePlus <[email protected]>
AuthorDate: Thu Jul 3 21:31:54 2025 +0800

    at_eof bug fix
---
 doris_mcp_server/utils/db.py | 295 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 238 insertions(+), 57 deletions(-)

diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py
index e04b4df..0ef6834 100644
--- a/doris_mcp_server/utils/db.py
+++ b/doris_mcp_server/utils/db.py
@@ -70,6 +70,7 @@ class DorisConnection:
         self.query_count = 0
         self.is_healthy = True
         self.security_manager = security_manager
+        self.logger = logging.getLogger(__name__)
 
     async def execute(self, sql: str, params: tuple | None = None, 
auth_context=None) -> QueryResult:
         """Execute SQL query"""
@@ -135,31 +136,88 @@ class DorisConnection:
             raise
 
     async def ping(self) -> bool:
-        """Check connection health status"""
+        """Check connection health status with enhanced at_eof error 
detection"""
         try:
-            # Check if connection exists and is not closed
+            # Check 1: Connection exists and is not closed
             if not self.connection or self.connection.closed:
                 self.is_healthy = False
                 return False
             
-            # Check if connection has _reader (aiomysql internal state)
-            # This prevents the 'NoneType' object has no attribute 'at_eof' 
error
+            # Check 2: Comprehensive internal state validation
+            # This is critical for detecting at_eof issues before they cause 
errors
             if not hasattr(self.connection, '_reader') or 
self.connection._reader is None:
+                self.logger.debug(f"Connection {self.session_id} has invalid 
_reader state")
+                self.is_healthy = False
+                return False
+            
+            # Check 3: Verify transport state
+            if (hasattr(self.connection._reader, '_transport') and 
+                self.connection._reader._transport is None):
+                self.logger.debug(f"Connection {self.session_id} has invalid 
transport state")
+                self.is_healthy = False
+                return False
+            
+            # Check 4: Additional stream state validation
+            if (hasattr(self.connection._reader, 'at_eof') and 
+                callable(self.connection._reader.at_eof)):
+                try:
+                    # If the stream is already at EOF, the connection is broken
+                    if self.connection._reader.at_eof():
+                        self.logger.debug(f"Connection {self.session_id} 
reader is at EOF")
+                        self.is_healthy = False
+                        return False
+                except Exception:
+                    # If we can't even check at_eof, the connection is 
problematic
+                    self.logger.debug(f"Connection {self.session_id} cannot 
check at_eof state")
+                    self.is_healthy = False
+                    return False
+            
+            # Check 5: Try to ping the connection with timeout
+            try:
+                await asyncio.wait_for(self.connection.ping(), timeout=5)
+            except asyncio.TimeoutError:
+                self.logger.debug(f"Connection {self.session_id} ping timeout")
+                self.is_healthy = False
+                return False
+            except Exception as ping_error:
+                # Check for specific error patterns
+                error_str = str(ping_error).lower()
+                if any(keyword in error_str for keyword in ['at_eof', 
'nonetype', 'reader', 'transport']):
+                    self.logger.debug(f"Connection {self.session_id} ping 
failed with connection state error: {ping_error}")
+                else:
+                    self.logger.debug(f"Connection {self.session_id} ping 
failed: {ping_error}")
                 self.is_healthy = False
                 return False
             
-            # Additional check for reader's state
-            if hasattr(self.connection._reader, '_transport') and 
self.connection._reader._transport is None:
+            # Check 6: Final validation with a simple query
+            try:
+                async with self.connection.cursor() as cursor:
+                    await asyncio.wait_for(cursor.execute("SELECT 1"), 
timeout=3)
+                    result = await asyncio.wait_for(cursor.fetchone(), 
timeout=3)
+                    if not result or result[0] != 1:
+                        self.logger.debug(f"Connection {self.session_id} test 
query returned invalid result")
+                        self.is_healthy = False
+                        return False
+            except Exception as query_error:
+                error_str = str(query_error).lower()
+                if any(keyword in error_str for keyword in ['at_eof', 
'nonetype', 'reader', 'transport']):
+                    self.logger.debug(f"Connection {self.session_id} test 
query failed with connection state error: {query_error}")
+                else:
+                    self.logger.debug(f"Connection {self.session_id} test 
query failed: {query_error}")
                 self.is_healthy = False
                 return False
             
-            # Try to ping the connection
-            await self.connection.ping()
+            # If all checks pass, the connection is healthy
             self.is_healthy = True
             return True
-        except (AttributeError, OSError, ConnectionError, Exception) as e:
-            # Log the specific error for debugging
-            logging.debug(f"Connection ping failed for session 
{self.session_id}: {e}")
+            
+        except Exception as e:
+            # Any uncaught exception means the connection is not healthy
+            error_str = str(e).lower()
+            if any(keyword in error_str for keyword in ['at_eof', 'nonetype', 
'reader', 'transport']):
+                self.logger.debug(f"Connection {self.session_id} ping failed 
with connection state error: {e}")
+            else:
+                self.logger.debug(f"Connection {self.session_id} ping failed 
with unexpected error: {e}")
             self.is_healthy = False
             return False
 
@@ -188,11 +246,15 @@ class DorisConnectionManager:
         self.logger = logging.getLogger(__name__)
         self.security_manager = security_manager
 
-        # Health check configuration
-        self.health_check_interval = config.database.health_check_interval or 
60
+        # Enhanced health check configuration for long-connection issues
+        # Reduce health check interval to detect stale connections faster
+        self.health_check_interval = min(config.database.health_check_interval 
or 60, 30)  # Max 30 seconds
         self.max_connection_age = config.database.max_connection_age or 3600
         self.connection_timeout = config.database.connection_timeout or 30
-
+        
+        # Add stale connection detection threshold (much shorter than MySQL's 
wait_timeout)
+        self.stale_connection_threshold = 900  # 15 minutes - connections 
older than this are considered stale
+        
         # Start background tasks
         self._health_check_task = None
         self._cleanup_task = None
@@ -210,8 +272,11 @@ class DorisConnectionManager:
             if not self.config.database.password:
                 self.logger.warning("Database password is empty, this may 
cause connection issues")
             
-            # Create connection pool with improved stability parameters
-            # Key change: Set minsize=0 to avoid pre-creation issues that 
cause at_eof errors
+            # Create connection pool with aggressive connection recycling to 
prevent at_eof issues
+            # Key changes:
+            # 1. Reduce pool_recycle to 30 minutes (1800 seconds) - much 
shorter than MySQL's wait_timeout
+            # 2. Add shorter connect_timeout to fail fast on bad connections
+            # 3. Enable autocommit to avoid transaction state issues
             self.pool = await aiomysql.create_pool(
                 host=self.config.database.host,
                 port=self.config.database.port,
@@ -220,12 +285,11 @@ class DorisConnectionManager:
                 db=self.config.database.database,
                 charset="utf8",
                 minsize=self.config.database.min_connections,  # Always 0 per 
configuration to avoid at_eof issues
-
                 maxsize=self.config.database.max_connections or 20,
                 autocommit=True,
-                connect_timeout=self.connection_timeout,
-                # Enhanced stability parameters
-                pool_recycle=7200,  # Recycle connections every 2 hours
+                connect_timeout=15,  # Shorter timeout to fail fast
+                # Aggressive connection recycling to prevent stale connections
+                pool_recycle=1800,  # Recycle connections every 30 minutes 
instead of 2 hours
                 echo=False,  # Don't echo SQL statements
             )
 
@@ -234,13 +298,12 @@ class DorisConnectionManager:
                 raise RuntimeError("Connection pool robust test failed")
 
             self.logger.info(
-                f"Connection pool initialized successfully with on-demand 
connection creation, "
+                f"Connection pool initialized successfully with aggressive 
recycling (30min), "
                 f"min connections: {self.config.database.min_connections}, "
-
                 f"max connections: {self.config.database.max_connections or 
20}"
             )
 
-            # Start background monitoring tasks
+            # Start background monitoring tasks with more frequent health 
checks
             self._health_check_task = 
asyncio.create_task(self._health_check_loop())
             self._cleanup_task = asyncio.create_task(self._cleanup_loop())
 
@@ -312,10 +375,25 @@ class DorisConnectionManager:
                     self.logger.warning(f"Pool returned closed connection 
(attempt {attempt + 1})")
                     continue
                 
-                # Perform a simple ping test instead of checking internal state
-                # Internal state (_reader, _transport) might not be fully 
initialized yet
+                # Enhanced connection validation with multiple checks
                 try:
-                    # Test basic connectivity with a simple query
+                    # Check 1: Verify connection object internal state
+                    if not hasattr(raw_connection, '_reader') or 
raw_connection._reader is None:
+                        self.logger.warning(f"Connection has invalid _reader 
state (attempt {attempt + 1})")
+                        await raw_connection.ensure_closed()
+                        continue
+                        
+                    # Check 2: Verify transport state
+                    if (hasattr(raw_connection._reader, '_transport') and 
+                        raw_connection._reader._transport is None):
+                        self.logger.warning(f"Connection has invalid transport 
state (attempt {attempt + 1})")
+                        await raw_connection.ensure_closed()
+                        continue
+                    
+                    # Check 3: Perform ping test to verify server-side 
connectivity
+                    await raw_connection.ping()
+                    
+                    # Check 4: Test with actual query execution
                     async with raw_connection.cursor() as cursor:
                         await cursor.execute("SELECT 1")
                         result = await cursor.fetchone()
@@ -328,17 +406,27 @@ class DorisConnectionManager:
                             continue
                             
                 except Exception as e:
-                    # Check if this is an at_eof error specifically
+                    # Enhanced error detection for connection issues
                     error_str = str(e).lower()
-                    if 'at_eof' in error_str or 'nonetype' in error_str:
-                        self.logger.warning(f"Connection has at_eof issue 
(attempt {attempt + 1}): {e}")
+                    
+                    # Check for various connection-related errors
+                    connection_error_keywords = [
+                        'at_eof', 'nonetype', 'connection', 'transport', 
'reader', 
+                        'lost connection', 'broken pipe', 'connection reset',
+                        'timed out', 'connection refused', 'host unreachable'
+                    ]
+                    
+                    is_connection_error = any(keyword in error_str for keyword 
in connection_error_keywords)
+                    
+                    if is_connection_error:
+                        self.logger.warning(f"Connection validation failed 
with connection error (attempt {attempt + 1}): {e}")
                     else:
-                        self.logger.warning(f"Connection test failed (attempt 
{attempt + 1}): {e}")
+                        self.logger.warning(f"Connection validation failed 
(attempt {attempt + 1}): {e}")
                     
                     try:
                         await raw_connection.ensure_closed()
                     except Exception:
-                        pass
+                        pass  # Ignore cleanup errors
                     continue
                 
             except Exception as e:
@@ -346,8 +434,10 @@ class DorisConnectionManager:
                 if attempt == max_retries - 1:
                     raise RuntimeError(f"Failed to create valid connection 
after {max_retries} attempts: {e}")
                 else:
-                    # Exponential backoff
-                    await asyncio.sleep(0.5 * (2 ** attempt))
+                    # Exponential backoff with jitter to avoid thundering herd
+                    base_delay = 0.5 * (2 ** attempt)
+                    jitter = base_delay * 0.1 * (0.5 - 
asyncio.get_running_loop().time() % 1)
+                    await asyncio.sleep(base_delay + jitter)
         
         raise RuntimeError("Failed to create valid connection")
 
@@ -505,41 +595,84 @@ class DorisConnectionManager:
                 self.logger.error(f"Health check error: {e}")
 
     async def _perform_health_check(self):
-        """Perform enhanced health check"""
+        """Perform enhanced health check with aggressive stale connection 
detection"""
         try:
             unhealthy_sessions = []
+            stale_sessions = []
+            current_time = datetime.utcnow()
             
             # Enhanced health check with comprehensive validation
             for session_id, conn in self.session_connections.items():
-                if not await self._comprehensive_connection_health_check(conn):
-                    unhealthy_sessions.append(session_id)
-            
-            # Check for stale connections (over 30 minutes old)
-            current_time = datetime.utcnow()
-            stale_sessions = []
-            for session_id, conn in self.session_connections.items():
-                if session_id not in unhealthy_sessions:  # Don't double-check
-                    last_used_delta = (current_time - 
conn.last_used).total_seconds()
-                    if last_used_delta > 1800:  # 30 minutes
-                        # Force a comprehensive health check for stale 
connections
-                        if not await 
self._comprehensive_connection_health_check(conn):
+                try:
+                    # Check 1: Basic connection health
+                    if not await 
self._comprehensive_connection_health_check(conn):
+                        unhealthy_sessions.append(session_id)
+                        self.logger.debug(f"Session {session_id} marked as 
unhealthy")
+                        continue
+                    
+                    # Check 2: Stale connection detection (much more 
aggressive)
+                    time_since_last_use = (current_time - 
conn.last_used).total_seconds()
+                    connection_age = (current_time - 
conn.created_at).total_seconds()
+                    
+                    # Mark as stale if:
+                    # 1. Last used more than 15 minutes ago, OR
+                    # 2. Connection age exceeds maximum age, OR  
+                    # 3. Connection hasn't been used in a while and is old
+                    if (time_since_last_use > self.stale_connection_threshold 
or
+                        connection_age > self.max_connection_age or
+                        (time_since_last_use > 300 and connection_age > 
1800)):  # 5 min unused + 30 min old
+                        
+                        # For stale connections, do an extra validation
+                        try:
+                            # Try a more aggressive ping test
+                            async with conn.connection.cursor() as cursor:
+                                await asyncio.wait_for(cursor.execute("SELECT 
1"), timeout=3)
+                                await asyncio.wait_for(cursor.fetchone(), 
timeout=3)
+                            # If we get here, connection is actually healthy 
despite being stale
+                            self.logger.debug(f"Stale connection {session_id} 
passed extra validation")
+                        except Exception as stale_test_error:
                             stale_sessions.append(session_id)
+                            self.logger.debug(f"Session {session_id} marked as 
stale: {stale_test_error}")
+                            continue
+                    
+                except Exception as check_error:
+                    # If we can't even check the connection, it's definitely 
problematic
+                    self.logger.warning(f"Health check failed for session 
{session_id}: {check_error}")
+                    unhealthy_sessions.append(session_id)
             
             all_problematic_sessions = list(set(unhealthy_sessions + 
stale_sessions))
             
             # Clean up problematic connections
+            cleanup_results = {"success": 0, "failed": 0}
             for session_id in all_problematic_sessions:
-                await self._cleanup_session_connection(session_id)
-                self.metrics.failed_connections += 1
+                try:
+                    await self._cleanup_session_connection(session_id)
+                    cleanup_results["success"] += 1
+                    self.metrics.failed_connections += 1
+                except Exception as cleanup_error:
+                    cleanup_results["failed"] += 1
+                    self.logger.error(f"Failed to cleanup session 
{session_id}: {cleanup_error}")
             
             # Update metrics
             await self._update_connection_metrics()
             self.metrics.last_health_check = datetime.utcnow()
             
+            # Log results
             if all_problematic_sessions:
-                self.logger.warning(f"Health check: cleaned up 
{len(unhealthy_sessions)} unhealthy and {len(stale_sessions)} stale 
connections")
+                self.logger.warning(
+                    f"Health check: cleaned up {len(unhealthy_sessions)} 
unhealthy and "
+                    f"{len(stale_sessions)} stale connections "
+                    f"(success: {cleanup_results['success']}, failed: 
{cleanup_results['failed']})"
+                )
             else:
                 self.logger.debug(f"Health check: all 
{len(self.session_connections)} connections healthy")
+            
+            # If we have a lot of connection failures, log some diagnostic info
+            if self.metrics.connection_errors > 50:  # Threshold for 
diagnostic logging
+                self.logger.warning(
+                    f"High connection error count detected: 
{self.metrics.connection_errors}. "
+                    f"This may indicate persistent connectivity issues with 
the database."
+                )
 
         except Exception as e:
             self.logger.error(f"Health check failed: {e}")
@@ -551,10 +684,11 @@ class DorisConnectionManager:
                 pass  # Don't let diagnosis failure crash health check
 
     async def _cleanup_loop(self):
-        """Background cleanup loop"""
+        """Background cleanup loop with more frequent execution"""
         while True:
             try:
-                await asyncio.sleep(300)  # Run every 5 minutes
+                # Run cleanup more frequently - every 2 minutes instead of 5
+                await asyncio.sleep(120)  # Run every 2 minutes
                 await self._cleanup_idle_connections()
             except asyncio.CancelledError:
                 break
@@ -562,22 +696,69 @@ class DorisConnectionManager:
                 self.logger.error(f"Cleanup loop error: {e}")
 
     async def _cleanup_idle_connections(self):
-        """Clean up idle connections"""
+        """Clean up idle connections with more aggressive criteria"""
         current_time = datetime.utcnow()
         idle_sessions = []
         
         for session_id, conn in self.session_connections.items():
-            # Check if connection has exceeded maximum age
-            age = (current_time - conn.created_at).total_seconds()
-            if age > self.max_connection_age:
+            try:
+                # Enhanced idle connection detection
+                connection_age = (current_time - 
conn.created_at).total_seconds()
+                time_since_last_use = (current_time - 
conn.last_used).total_seconds()
+                
+                # Mark as idle if:
+                # 1. Connection has exceeded maximum age, OR
+                # 2. Connection hasn't been used for more than 20 minutes, OR
+                # 3. Connection is old and hasn't been used recently
+                should_cleanup = (
+                    connection_age > self.max_connection_age or
+                    time_since_last_use > 1200 or  # 20 minutes unused
+                    (connection_age > 1800 and time_since_last_use > 600)  # 
30 min old + 10 min unused
+                )
+                
+                if should_cleanup:
+                    # Before marking for cleanup, try a quick health check
+                    try:
+                        # Quick validation - if this fails, definitely cleanup
+                        if not conn.connection or conn.connection.closed:
+                            idle_sessions.append(session_id)
+                            continue
+                            
+                        # Quick ping test with timeout
+                        await asyncio.wait_for(conn.connection.ping(), 
timeout=2)
+                        
+                        # If ping succeeds but connection is still very old, 
cleanup anyway
+                        if connection_age > self.max_connection_age:
+                            idle_sessions.append(session_id)
+                            self.logger.debug(f"Cleaning up old but healthy 
connection for session {session_id}")
+                        else:
+                            self.logger.debug(f"Keeping healthy connection for 
session {session_id}")
+                            
+                    except Exception as health_error:
+                        # Health check failed, definitely cleanup
+                        idle_sessions.append(session_id)
+                        self.logger.debug(f"Cleanup marking session 
{session_id} due to health check failure: {health_error}")
+                        
+            except Exception as e:
+                self.logger.warning(f"Error checking connection {session_id} 
for cleanup: {e}")
+                # If we can't even check it, it's probably broken
                 idle_sessions.append(session_id)
         
         # Clean up idle connections
+        cleanup_results = {"success": 0, "failed": 0}
         for session_id in idle_sessions:
-            await self._cleanup_session_connection(session_id)
+            try:
+                await self._cleanup_session_connection(session_id)
+                cleanup_results["success"] += 1
+            except Exception as cleanup_error:
+                cleanup_results["failed"] += 1
+                self.logger.error(f"Failed to cleanup idle session 
{session_id}: {cleanup_error}")
         
         if idle_sessions:
-            self.logger.info(f"Cleaned up {len(idle_sessions)} idle 
connections")
+            self.logger.info(
+                f"Cleaned up {len(idle_sessions)} idle connections "
+                f"(success: {cleanup_results['success']}, failed: 
{cleanup_results['failed']})"
+            )
 
     async def _update_connection_metrics(self):
         """Update connection metrics"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to