This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a26b1eeee754b58ec66c8e055cdf89f65ad65dc8
Author: camby <camby...@tencent.com>
AuthorDate: Mon Mar 4 17:26:21 2024 +0800

    [fix](connections) fix connection hang after too many connections (#31594)
    
    Issue Number: close #31569
    
    Fix fe connection hang after too high qps
    
    After fix, the third SQL will return error instead of hang:
    ERROR 1203 (HY000): #42000Too many connections
---
 .../org/apache/doris/mysql/AcceptListener.java     | 107 +++++++++++----------
 .../java/org/apache/doris/mysql/MysqlServer.java   |   2 +-
 2 files changed, 59 insertions(+), 50 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index bee00d9e696..4aa6cc85abe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -32,6 +32,7 @@ import org.xnio.StreamConnection;
 import org.xnio.channels.AcceptingChannel;
 
 import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * listener for accept mysql connections.
@@ -64,57 +65,65 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
             context.setEnv(Env.getCurrentEnv());
             connectScheduler.submit(context);
 
-            channel.getWorker().execute(() -> {
-                try {
-                    // Set thread local info
-                    context.setThreadLocalInfo();
-                    context.setConnectScheduler(connectScheduler);
-                    // authenticate check failed.
-                    if (!MysqlProto.negotiate(context)) {
-                        throw new AfterConnectedException("mysql negotiate 
failed");
-                    }
-                    if (connectScheduler.registerConnection(context)) {
-                        MysqlProto.sendResponsePacket(context);
-                        connection.setCloseListener(streamConnection -> 
connectScheduler.unregisterConnection(context));
-                    } else {
-                        
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
-                                "Reach limit of connections");
-                        MysqlProto.sendResponsePacket(context);
-                        throw new AfterConnectedException("Reach limit of 
connections");
-                    }
-                    context.setStartTime();
-                    int userQueryTimeout = 
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
-                    if (userQueryTimeout <= 0) {
-                        LOG.warn("Connection set query timeout to {}",
-                                    
context.getSessionVariable().getQueryTimeoutS());
-                    }
-                    context.setUserQueryTimeout(userQueryTimeout);
-                    context.setUserInsertTimeout(
-                            
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
-                    ConnectProcessor processor = new 
MysqlConnectProcessor(context);
-                    context.startAcceptQuery(processor);
-                } catch (AfterConnectedException e) {
-                    // do not need to print log for this kind of exception.
-                    // just clean up the context;
-                    context.cleanup();
-                } catch (Throwable e) {
-                    // should be unexpected exception, so print warn log
-                    if (context.getCurrentUserIdentity() != null) {
-                        LOG.warn("connect processor exception because ", e);
-                    } else if (e instanceof Error) {
-                        LOG.error("connect processor exception because ", e);
-                    } else {
-                        // for unauthrorized access such lvs probe request,
-                        // may cause exception, just log it in debug level
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("connect processor exception because ", 
e);
+            try {
+                channel.getWorker().execute(() -> {
+                    try {
+                        // Set thread local info
+                        context.setThreadLocalInfo();
+                        context.setConnectScheduler(connectScheduler);
+                        // authenticate check failed.
+                        if (!MysqlProto.negotiate(context)) {
+                            throw new AfterConnectedException("mysql negotiate 
failed");
+                        }
+                        if (connectScheduler.registerConnection(context)) {
+                            MysqlProto.sendResponsePacket(context);
+                            connection.setCloseListener(
+                                    streamConnection -> 
connectScheduler.unregisterConnection(context));
+                        } else {
+                            
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+                                    "Reach limit of connections");
+                            MysqlProto.sendResponsePacket(context);
+                            throw new AfterConnectedException("Reach limit of 
connections");
                         }
+                        context.setStartTime();
+                        int userQueryTimeout = 
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
+                        if (userQueryTimeout <= 0) {
+                            LOG.warn("Connection set query timeout to {}",
+                                        
context.getSessionVariable().getQueryTimeoutS());
+                        }
+                        context.setUserQueryTimeout(userQueryTimeout);
+                        context.setUserInsertTimeout(
+                                
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
+                        ConnectProcessor processor = new 
MysqlConnectProcessor(context);
+                        context.startAcceptQuery(processor);
+                    } catch (AfterConnectedException e) {
+                        // do not need to print log for this kind of exception.
+                        // just clean up the context;
+                        context.cleanup();
+                    } catch (Throwable e) {
+                        // should be unexpected exception, so print warn log
+                        if (context.getCurrentUserIdentity() != null) {
+                            LOG.warn("connect processor exception because ", 
e);
+                        } else if (e instanceof Error) {
+                            LOG.error("connect processor exception because ", 
e);
+                        } else {
+                            // for unauthrorized access such lvs probe request,
+                            // may cause exception, just log it in debug level
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("connect processor exception because 
", e);
+                            }
+                        }
+                        context.cleanup();
+                    } finally {
+                        ConnectContext.remove();
                     }
-                    context.cleanup();
-                } finally {
-                    ConnectContext.remove();
-                }
-            });
+                });
+            } catch (RejectedExecutionException e) {
+                
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+                        "Too many connections");
+                MysqlProto.sendResponsePacket(context);
+                context.cleanup();
+            }
         } catch (IOException e) {
             LOG.warn("Connection accept failed.", e);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
index 4fb11136132..5f70e3000b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
@@ -51,7 +51,7 @@ public class MysqlServer {
     private AcceptingChannel<StreamConnection> server;
 
     // default task service.
-    private ExecutorService taskService = 
ThreadPoolManager.newDaemonCacheThreadPool(
+    private ExecutorService taskService = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
             Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true);
 
     public MysqlServer(int port, ConnectScheduler connectScheduler) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to