This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 141438756c1 [fix][test] Fix flaky 
SaslAuthenticateTest.testMaxInflightContext() test (#25948)
141438756c1 is described below

commit 141438756c1bae463c26c4f2f7ee77eb6353c315
Author: Oneby Wang <[email protected]>
AuthorDate: Sat Jun 6 23:29:09 2026 +0800

    [fix][test] Fix flaky SaslAuthenticateTest.testMaxInflightContext() test 
(#25948)
---
 .../authentication/AuthenticationProviderSasl.java | 11 ++++
 .../authentication/SaslAuthenticateTest.java       | 70 +++++++++++++++++++---
 2 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index 6dc2ad02933..5e539275af8 100644
--- 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -35,6 +35,7 @@ import static 
org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
 import static 
org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
 import jakarta.servlet.http.HttpServletRequest;
 import jakarta.servlet.http.HttpServletResponse;
 import java.io.IOException;
@@ -334,6 +335,16 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
         }
     }
 
+    @VisibleForTesting
+    Cache<Long, AuthenticationState> getAuthStates() {
+        return authStates;
+    }
+
+    @VisibleForTesting
+    void setAuthStates(Cache<Long, AuthenticationState> authStates) {
+        this.authStates = authStates;
+    }
+
     private String sanitizeHeaderValue(String value) {
         if (value == null) {
             return null;
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index bb8595daced..24d895dffca 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ImmutableSet;
 import jakarta.servlet.http.HttpServletRequest;
 import jakarta.servlet.http.HttpServletResponse;
@@ -42,6 +43,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javax.security.auth.login.Configuration;
 import lombok.Cleanup;
@@ -356,12 +360,11 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testMaxInflightContext() throws Exception {
         @Cleanup
         AuthenticationProviderSasl saslServer = new 
AuthenticationProviderSasl();
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
-        doReturn("Init").when(servletRequest).getHeader("State");
+        
doReturn(SaslConstants.SASL_STATE_CLIENT_INIT).when(servletRequest).getHeader(SaslConstants.SASL_HEADER_STATE);
         conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
         conf.setMaxInflightSaslContext(1);
         
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
@@ -370,14 +373,65 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
             AuthenticationDataProvider dataProvider =  
authSasl.getAuthData("localhost");
             AuthData initData1 = 
dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             
doReturn(Base64.getEncoder().encodeToString(initData1.getBytes())).when(
-                    servletRequest).getHeader("SASL-Token");
-            
doReturn(String.valueOf(i)).when(servletRequest).getHeader("SASL-Server-ID");
+                    servletRequest).getHeader(SaslConstants.SASL_AUTH_TOKEN);
+            
doReturn(String.valueOf(i)).when(servletRequest).getHeader(SaslConstants.SASL_STATE_SERVER);
             saslServer.authenticateHttpRequest(servletRequest, 
mock(HttpServletResponse.class));
         }
-        Field field = 
AuthenticationProviderSasl.class.getDeclaredField("authStates");
-        field.setAccessible(true);
-        Cache<Long, AuthenticationState> cache = (Cache<Long, 
AuthenticationState>) field.get(saslServer);
+        Cache<Long, AuthenticationState> cache = saslServer.getAuthStates();
         //only 1 context was left in the memory
-        assertEquals(cache.asMap().size(), 1);
+        // Caffeine may perform size-based eviction asynchronously, so force 
maintenance before asserting.
+        cache.cleanUp();
+        assertEquals(cache.asMap().size(), conf.getMaxInflightSaslContext());
+    }
+
+    @Test
+    public void testMaxInflightContextWithDelayedCaffeineMaintenance() throws 
Exception {
+        @Cleanup
+        AuthenticationProviderSasl saslServer = new 
AuthenticationProviderSasl();
+        HttpServletRequest servletRequest = mock(HttpServletRequest.class);
+        
doReturn(SaslConstants.SASL_STATE_CLIENT_INIT).when(servletRequest).getHeader(SaslConstants.SASL_HEADER_STATE);
+        conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
+        conf.setMaxInflightSaslContext(1);
+        
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
+
+        CountDownLatch maintenanceStarted = new CountDownLatch(1);
+        CountDownLatch allowMaintenance = new CountDownLatch(1);
+        @Cleanup("shutdownNow")
+        ExecutorService maintenanceExecutor = 
Executors.newSingleThreadExecutor();
+        Cache<Long, AuthenticationState> delayedMaintenanceCache = 
Caffeine.newBuilder()
+                .maximumSize(conf.getMaxInflightSaslContext())
+                .expireAfterWrite(conf.getInflightSaslContextExpiryMs(), 
TimeUnit.MILLISECONDS)
+                .executor(command -> maintenanceExecutor.execute(() -> {
+                    maintenanceStarted.countDown();
+                    try {
+                        allowMaintenance.await();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    command.run();
+                }))
+                .build();
+        saslServer.setAuthStates(delayedMaintenanceCache);
+
+        try {
+            for (int i = 0; i < 10; i++) {
+                AuthenticationDataProvider dataProvider = 
authSasl.getAuthData("localhost");
+                AuthData initData1 = 
dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+                
doReturn(Base64.getEncoder().encodeToString(initData1.getBytes())).when(
+                        
servletRequest).getHeader(SaslConstants.SASL_AUTH_TOKEN);
+                
doReturn(String.valueOf(i)).when(servletRequest).getHeader(SaslConstants.SASL_STATE_SERVER);
+                saslServer.authenticateHttpRequest(servletRequest, 
mock(HttpServletResponse.class));
+            }
+            assertTrue(maintenanceStarted.await(5, TimeUnit.SECONDS));
+
+            Cache<Long, AuthenticationState> cache = 
saslServer.getAuthStates();
+            assertTrue(cache.asMap().size() > 
conf.getMaxInflightSaslContext());
+
+            // Caffeine may perform size-based eviction asynchronously, so 
force maintenance before asserting.
+            cache.cleanUp();
+            assertEquals(cache.asMap().size(), 
conf.getMaxInflightSaslContext());
+        } finally {
+            allowMaintenance.countDown();
+        }
     }
 }

Reply via email to