This is an automated email from the ASF dual-hosted git repository.

freeoneplus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-mcp-server.git


The following commit(s) were added to refs/heads/master by this push:
     new 43143f0  feat: add batch SQL execution support for MCP (#70)
43143f0 is described below

commit 43143f0b30b16ce6315b4f5106d0951d81fba251
Author: zzzzwc <[email protected]>
AuthorDate: Wed Dec 24 12:45:29 2025 +0800

    feat: add batch SQL execution support for MCP (#70)
    
    * feat: add batch SQL execution support for MCP
    
    - Add sql field to QueryResult to track executed query
    - Implement execute_batch_sqls_for_mcp for executing multiple SQL
    - Use sqlparse to split and execute multiple SQL in single request
    - Improve error handling in execute_batch_queries
    - Return multiple results format when batch queries are detected
    
    * test: add multi-SQL statements test for query executor
---
 doris_mcp_server/utils/db.py             |  2 +
 doris_mcp_server/utils/query_executor.py | 71 +++++++++++++++++++++++++++++---
 test/utils/test_query_executor.py        | 70 +++++++++++++++++++++++++++++++
 3 files changed, 137 insertions(+), 6 deletions(-)

diff --git a/doris_mcp_server/utils/db.py b/doris_mcp_server/utils/db.py
index 7d2acb4..8ddf3f3 100644
--- a/doris_mcp_server/utils/db.py
+++ b/doris_mcp_server/utils/db.py
@@ -59,6 +59,7 @@ class QueryResult:
     metadata: dict[str, Any]
     execution_time: float
     row_count: int
+    sql: str
 
 
 class DorisConnection:
@@ -132,6 +133,7 @@ class DorisConnection:
                     metadata=metadata,
                     execution_time=execution_time,
                     row_count=row_count,
+                    sql=sql
                 )
 
         except Exception as e:
diff --git a/doris_mcp_server/utils/query_executor.py 
b/doris_mcp_server/utils/query_executor.py
index cf2fc28..1e15c2c 100644
--- a/doris_mcp_server/utils/query_executor.py
+++ b/doris_mcp_server/utils/query_executor.py
@@ -33,6 +33,8 @@ from datetime import datetime, timedelta, date
 from typing import Any, Dict
 from decimal import Decimal
 
+import sqlparse
+
 from .db import DorisConnectionManager, QueryResult
 from .logger import get_logger
 from .sql_security_utils import get_auth_context
@@ -468,6 +470,51 @@ class DorisQueryExecutor:
                 f"Slow query detected: {execution_time:.2f}s (threshold: 
{self.slow_query_threshold}s)"
             )
 
+    async def execute_batch_sqls_for_mcp(
+            self, sqls: list[str],
+            timeout: int = 30,
+            session_id: str = "mcp_session",
+            user_id: str = "mcp_user",
+            auth_context=None
+    ) -> dict[str, Any]:
+        """Execute multiple sqls in batch"""
+        if not sqls:
+            return {
+                "success": False,
+                "error": "SQL query is required",
+                "data": None
+            }
+        query_requests = [
+            QueryRequest(
+                sql=sql,
+                session_id=session_id,
+                user_id=user_id,
+                timeout=timeout,
+                cache_enabled=False
+            )
+            for sql in sqls
+        ]
+        query_results = await self.execute_batch_queries(query_requests, 
auth_context)
+        # Serialize data for JSON response
+        results = [
+            {
+                "data": [self._serialize_row_data(data) for data in 
result.data],
+                "row_count": result.row_count,
+                "execution_time": result.execution_time,
+                "metadata": {
+                    "columns": result.metadata.get("columns", []),
+                    "query": result.sql
+                }
+            }
+            for result in query_results
+        ]
+
+        return {
+            "success": True,
+            "multiple_results": True,
+            "results": results
+        }
+
     async def execute_batch_queries(
         self, query_requests: list[QueryRequest], auth_context=None
     ) -> list[QueryResult]:
@@ -485,13 +532,16 @@ class DorisQueryExecutor:
             self.execute_query(request, auth_context) for request in 
query_requests
         ]
 
-        try:
-            results = await asyncio.gather(*tasks, return_exceptions=True)
-        except Exception as e:
-            self.logger.error(f"Batch query execution failed: {e}")
-            raise
+        query_results = []
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        for result in results:
+            if isinstance(result, Exception):
+                self.logger.error(f"Batch query execution failed: {result}")
+                raise result
+            else:
+                query_results.append(result)
 
-        return results
+        return query_results
 
     async def explain_query(self, sql: str, session_id: str) -> dict[str, Any]:
         """Get query execution plan"""
@@ -635,6 +685,15 @@ class DorisQueryExecutor:
                         sql = sql[:-1]
                     sql = f"{sql} LIMIT {limit}"
                 
+                all_statements = [
+                    s.strip()
+                    for s in sqlparse.split(sql)
+                    if s.strip()
+                ]
+                if len(all_statements) > 1:
+                    return await 
self.execute_batch_sqls_for_mcp(sqls=all_statements, timeout=timeout,
+                                                                 
session_id=session_id, user_id=user_id,
+                                                                 
auth_context=auth_context)
                 # Create query request
                 query_request = QueryRequest(
                     sql=sql,
diff --git a/test/utils/test_query_executor.py 
b/test/utils/test_query_executor.py
index a9a9b79..ef3c958 100644
--- a/test/utils/test_query_executor.py
+++ b/test/utils/test_query_executor.py
@@ -201,3 +201,73 @@ class TestDorisQueryExecutor:
             if result["success"]:
                 assert "data" in result
                 assert "row_count" in result 
+
+    @pytest.mark.asyncio
+    async def test_execute_multi_sql_statements(self, query_executor):
+        """Test execution of multiple SQL statements"""
+        from doris_mcp_server.utils.query_executor import QueryResult
+        
+        # Disable security check for this test
+        
query_executor.connection_manager.config.security.enable_security_check = False
+        
+        with patch.object(query_executor, 'execute_query') as mock_execute:
+            # Mock results for three SQL statements
+            mock_execute.side_effect = [
+                QueryResult(
+                    data=[{"id": 1, "name": "张三"}],
+                    row_count=1,
+                    execution_time=0.1,
+                    sql="SELECT id, name FROM users WHERE id = 1",
+                    metadata={"columns": ["id", "name"]}
+                ),
+                QueryResult(
+                    data=[{"id": 2, "name": "李四"}],
+                    row_count=1,
+                    execution_time=0.12,
+                    sql="SELECT id, name FROM users WHERE id = 2",
+                    metadata={"columns": ["id", "name"]}
+                ),
+                QueryResult(
+                    data=[{"count": 100}],
+                    row_count=1,
+                    execution_time=0.08,
+                    sql="SELECT COUNT(*) as count FROM users",
+                    metadata={"columns": ["count"]}
+                )
+            ]
+            
+            # Execute multiple SQL statements separated by semicolons
+            multi_sql = """
+                SELECT id, name FROM users WHERE id = 1;
+                SELECT id, name FROM users WHERE id = 2;
+                SELECT COUNT(*) as count FROM users;
+            """
+            
+            result = await query_executor.execute_sql_for_mcp(multi_sql)
+            
+            # Verify the result structure for multiple statements
+            assert result["success"] is True
+            assert result["multiple_results"] is True
+            assert "results" in result
+            assert len(result["results"]) == 3
+            
+            # Verify first query result
+            assert result["results"][0]["data"] == [{"id": 1, "name": "张三"}]
+            assert result["results"][0]["row_count"] == 1
+            assert result["results"][0]["metadata"]["columns"] == ["id", 
"name"]
+            assert result["results"][0]["metadata"]["query"] == "SELECT id, 
name FROM users WHERE id = 1"
+            
+            # Verify second query result
+            assert result["results"][1]["data"] == [{"id": 2, "name": "李四"}]
+            assert result["results"][1]["row_count"] == 1
+            assert result["results"][1]["metadata"]["columns"] == ["id", 
"name"]
+            assert result["results"][1]["metadata"]["query"] == "SELECT id, 
name FROM users WHERE id = 2"
+            
+            # Verify third query result
+            assert result["results"][2]["data"] == [{"count": 100}]
+            assert result["results"][2]["row_count"] == 1
+            assert result["results"][2]["metadata"]["columns"] == ["count"]
+            assert result["results"][2]["metadata"]["query"] == "SELECT 
COUNT(*) as count FROM users"
+            
+            # Verify execute_query was called three times
+            assert mock_execute.call_count == 3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to