This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a066e68365 [fix](arrow-flight-sql) Fix exceed user property max 
connection cause `Reach limit of connections` (#39127)
7a066e68365 is described below

commit 7a066e68365839c6c0d73bd789acc04cd787322c
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Fri Aug 23 10:07:57 2024 +0800

    [fix](arrow-flight-sql) Fix exceed user property max connection cause 
`Reach limit of connections` (#39127)
    
    Limit the number of arrow flight connections for a single user to less
    than the user property max_user_connections / 2, default 50.
---
 .../service/arrowflight/DorisFlightSqlService.java |  4 ++
 .../arrowflight/tokens/FlightTokenManagerImpl.java | 53 +++++++++++++++++++---
 2 files changed, 51 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
index 85377788097..df9099c6816 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java
@@ -57,6 +57,10 @@ public class DorisFlightSqlService {
         DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, 
flightSessionsManager);
         flightServer = FlightServer.builder(allocator, location, producer)
                 .headerAuthenticator(new 
FlightBearerTokenAuthenticator(flightTokenManager)).build();
+        LOG.info("Arrow Flight SQL service is created, port: {}, 
token_cache_size: {}"
+                        + ", qe_max_connection: {}, token_alive_time: {}",
+                port, Config.arrow_flight_token_cache_size, 
Config.qe_max_connection,
+                Config.arrow_flight_token_alive_time);
     }
 
     // start Arrow Flight SQL service, return true if success, otherwise false
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
index cd1b492de06..57101d995e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.doris.service.arrowflight.tokens;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
@@ -31,9 +32,12 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 
 import java.math.BigInteger;
 import java.security.SecureRandom;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -46,7 +50,9 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
     private final int cacheSize;
     private final int cacheExpiration;
 
-    private LoadingCache<String, FlightTokenDetails> tokenCache;
+    private final LoadingCache<String, FlightTokenDetails> tokenCache;
+    // <username, <token, 1>>
+    private final ConcurrentHashMap<String, LoadingCache<String, Integer>> 
usersTokenLRU = new ConcurrentHashMap<>();
 
     public FlightTokenManagerImpl(final int cacheSize, final int 
cacheExpiration) {
         this.cacheSize = cacheSize;
@@ -56,17 +62,19 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
                 .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
                 .removalListener(new RemovalListener<String, 
FlightTokenDetails>() {
                     @Override
-                    public void onRemoval(RemovalNotification<String, 
FlightTokenDetails> notification) {
+                    public void onRemoval(@NotNull RemovalNotification<String, 
FlightTokenDetails> notification) {
                         // TODO: broadcast this message to other FE
-                        LOG.info("evict bearer token: " + 
notification.getKey() + ", reason: "
+                        String token = notification.getKey();
+                        FlightTokenDetails tokenDetails = 
notification.getValue();
+                        LOG.info("evict bearer token: " + token + ", reason: 
token number exceeded, "
                                 + notification.getCause());
                         ConnectContext context = 
ExecuteEnv.getInstance().getScheduler()
-                                .getContext(notification.getKey());
+                                .getContext(token);
                         if (context != null) {
                             
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
-                            LOG.info("unregister flight connect context after 
evict bearer token: "
-                                    + notification.getKey());
+                            LOG.info("unregister flight connect context after 
evict bearer token: " + token);
                         }
+                        
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
                     }
                 }).build(new CacheLoader<String, FlightTokenDetails>() {
                     @Override
@@ -96,6 +104,29 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
                 flightAuthResult.getUserIdentity(), 
flightAuthResult.getRemoteIp());
 
         tokenCache.put(token, flightTokenDetails);
+        if (!usersTokenLRU.containsKey(username)) {
+            // TODO Modify usersTokenLRU size when user property maxConn 
changes. but LoadingCache currently not
+            // support modify.
+            usersTokenLRU.put(username,
+                    
CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username)
 / 2)
+                            .removalListener(new RemovalListener<String, 
Integer>() {
+                                @Override
+                                public void onRemoval(@NotNull 
RemovalNotification<String, Integer> notification) {
+                                    // TODO: broadcast this message to other FE
+                                    assert notification.getKey() != null;
+                                    
tokenCache.invalidate(notification.getKey());
+                                    LOG.info("evict bearer token: " + 
notification.getKey()
+                                            + ", reason: user connection 
exceeded, " + notification.getCause());
+                                }
+                            }).build(new CacheLoader<String, Integer>() {
+                                @NotNull
+                                @Override
+                                public Integer load(@NotNull String key) {
+                                    return 1;
+                                }
+                            }));
+        }
+        usersTokenLRU.get(username).put(token, 1);
         LOG.info("Created flight token for user: {}, token: {}", username, 
token);
         return flightTokenDetails;
     }
@@ -114,6 +145,16 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
             throw new IllegalArgumentException("bearer token expired: " + 
token + ", try reconnect, "
                     + "currently in fe.conf, `arrow_flight_token_alive_time`=" 
+ this.cacheExpiration);
         }
+        if (usersTokenLRU.containsKey(value.getUsername())) {
+            try {
+                usersTokenLRU.get(value.getUsername()).get(token);
+            } catch (ExecutionException ignored) {
+                throw new IllegalArgumentException("usersTokenLRU not exist 
bearer token: " + token);
+            }
+        } else {
+            throw new IllegalArgumentException(
+                    "bearer token not created: " + token + ", username:  " + 
value.getUsername());
+        }
         LOG.info("Validated bearer token for user: {}", value.getUsername());
         return value;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to