This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e32932006ac [fix](connections) fix connection hang after too many 
connections #31761
e32932006ac is described below

commit e32932006ace0f5a3f8a7deb9cc3bb0d833b0e7f
Author: camby <camby...@tencent.com>
AuthorDate: Wed Mar 6 20:38:14 2024 +0800

    [fix](connections) fix connection hang after too many connections #31761
---
 .../org/apache/doris/mysql/AcceptListener.java     | 97 ++++++++++++----------
 .../java/org/apache/doris/mysql/MysqlServer.java   |  2 +-
 2 files changed, 54 insertions(+), 45 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 1bde95c1650..30d0693a6b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -31,6 +31,7 @@ import org.xnio.StreamConnection;
 import org.xnio.channels.AcceptingChannel;
 
 import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
 
 /**
  * listener for accept mysql connections.
@@ -58,51 +59,59 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
             context.setEnv(Env.getCurrentEnv());
             connectScheduler.submit(context);
 
-            channel.getWorker().execute(() -> {
-                try {
-                    // Set thread local info
-                    context.setThreadLocalInfo();
-                    context.setConnectScheduler(connectScheduler);
-                    // authenticate check failed.
-                    if (!MysqlProto.negotiate(context)) {
-                        throw new AfterConnectedException("mysql negotiate 
failed");
+            try {
+                channel.getWorker().execute(() -> {
+                    try {
+                        // Set thread local info
+                        context.setThreadLocalInfo();
+                        context.setConnectScheduler(connectScheduler);
+                        // authenticate check failed.
+                        if (!MysqlProto.negotiate(context)) {
+                            throw new AfterConnectedException("mysql negotiate 
failed");
+                        }
+                        if (connectScheduler.registerConnection(context)) {
+                            MysqlProto.sendResponsePacket(context);
+                            connection.setCloseListener(
+                                    streamConnection -> 
connectScheduler.unregisterConnection(context));
+                        } else {
+                            
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+                                    "Reach limit of connections");
+                            MysqlProto.sendResponsePacket(context);
+                            throw new AfterConnectedException("Reach limit of 
connections");
+                        }
+                        context.setStartTime();
+                        context.setUserQueryTimeout(
+                                
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
+                        context.setUserInsertTimeout(
+                                
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
+                        ConnectProcessor processor = new 
ConnectProcessor(context);
+                        context.startAcceptQuery(processor);
+                    } catch (AfterConnectedException e) {
+                        // do not need to print log for this kind of exception.
+                        // just clean up the context;
+                        context.cleanup();
+                    } catch (Throwable e) {
+                        // should be unexpected exception, so print warn log
+                        if (context.getCurrentUserIdentity() != null) {
+                            LOG.warn("connect processor exception because ", 
e);
+                        } else if (e instanceof Error) {
+                            LOG.error("connect processor exception because ", 
e);
+                        } else {
+                            // for unauthrorized access such lvs probe request,
+                            // may cause exception, just log it in debug level
+                            LOG.debug("connect processor exception because ", 
e);
+                        }
+                        context.cleanup();
+                    } finally {
+                        ConnectContext.remove();
                     }
-                    if (connectScheduler.registerConnection(context)) {
-                        MysqlProto.sendResponsePacket(context);
-                        connection.setCloseListener(streamConnection -> 
connectScheduler.unregisterConnection(context));
-                    } else {
-                        
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
-                                "Reach limit of connections");
-                        MysqlProto.sendResponsePacket(context);
-                        throw new AfterConnectedException("Reach limit of 
connections");
-                    }
-                    context.setStartTime();
-                    context.setUserQueryTimeout(
-                            
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
-                    context.setUserInsertTimeout(
-                            
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
-                    ConnectProcessor processor = new ConnectProcessor(context);
-                    context.startAcceptQuery(processor);
-                } catch (AfterConnectedException e) {
-                    // do not need to print log for this kind of exception.
-                    // just clean up the context;
-                    context.cleanup();
-                } catch (Throwable e) {
-                    // should be unexpected exception, so print warn log
-                    if (context.getCurrentUserIdentity() != null) {
-                        LOG.warn("connect processor exception because ", e);
-                    } else if (e instanceof Error) {
-                        LOG.error("connect processor exception because ", e);
-                    } else {
-                        // for unauthrorized access such lvs probe request,
-                        // may cause exception, just log it in debug level
-                        LOG.debug("connect processor exception because ", e);
-                    }
-                    context.cleanup();
-                } finally {
-                    ConnectContext.remove();
-                }
-            });
+                });
+            } catch (RejectedExecutionException e) {
+                
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+                        "Too many connections");
+                MysqlProto.sendResponsePacket(context);
+                context.cleanup();
+            }
         } catch (IOException e) {
             LOG.warn("Connection accept failed.", e);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
index 4fb11136132..5f70e3000b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
@@ -51,7 +51,7 @@ public class MysqlServer {
     private AcceptingChannel<StreamConnection> server;
 
     // default task service.
-    private ExecutorService taskService = 
ThreadPoolManager.newDaemonCacheThreadPool(
+    private ExecutorService taskService = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
             Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true);
 
     public MysqlServer(int port, ConnectScheduler connectScheduler) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to