This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b9ef9239ddb83b5baa175ed2ba0b4687b1acaac
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Mar 23 17:11:17 2026 +0200

    [fix][client] Fix stale Healthy state in 
SameAuthParamsLookupAutoClusterFailover causing flaky test (#25388)
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
    (cherry picked from commit 3bc834f2fa8aae1ffe8f2fb3f795acd3d0de755d)
---
 .../SameAuthParamsLookupAutoClusterFailover.java   |   5 +
 ...ameAuthParamsLookupAutoClusterFailoverTest.java | 254 +++++++++++++++++++++
 2 files changed, 259 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
index 743f2e15164..5ae7b96906e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
@@ -143,6 +143,11 @@ public class SameAuthParamsLookupAutoClusterFailover 
implements ServiceUrlProvid
         for (int i = currentPulsarServiceIndex + 1; i < 
pulsarServiceUrlArray.length; i++) {
             if (probeAvailable(i)) {
                 return i;
+            } else {
+                // Mark the service as Failed to prevent a spurious recovery 
to it
+                // after we failover to a higher-indexed service.
+                pulsarServiceStateArray[i] = PulsarServiceState.Failed;
+                checkCounterArray[i].setValue(0);
             }
         }
         return -1;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java
new file mode 100644
index 00000000000..31e2bf6aaed
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoopGroup;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import 
org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class SameAuthParamsLookupAutoClusterFailoverTest {
+
+    private static final String URL0 = "pulsar://broker0:6650";
+    private static final String URL1 = "pulsar://broker1:6650";
+    private static final String URL2 = "pulsar://broker2:6650";
+
+    private EventLoopGroup executor;
+    private PulsarClientImpl mockClient;
+    private SameAuthParamsLookupAutoClusterFailover failover;
+    private PulsarServiceState[] stateArray;
+    private MutableInt[] counterArray;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        executor = EventLoopUtil.newEventLoopGroup(1, false,
+                new ExecutorProvider.ExtendedThreadFactory("test-failover"));
+
+        String[] urlArray = new String[]{URL0, URL1, URL2};
+        failover = SameAuthParamsLookupAutoClusterFailover.builder()
+                .pulsarServiceUrlArray(urlArray)
+                .failoverThreshold(1)
+                .recoverThreshold(2)
+                .checkHealthyIntervalMs(100)
+                .testTopic("a/b/c")
+                .markTopicNotFoundAsAvailable(true)
+                .build();
+
+        mockClient = mock(PulsarClientImpl.class);
+        doNothing().when(mockClient).updateServiceUrl(anyString());
+        doNothing().when(mockClient).reloadLookUp();
+
+        FieldUtils.writeField(failover, "pulsarClient", mockClient, true);
+        FieldUtils.writeField(failover, "executor", executor, true);
+
+        stateArray = (PulsarServiceState[]) FieldUtils.readField(failover, 
"pulsarServiceStateArray", true);
+        counterArray = (MutableInt[]) FieldUtils.readField(failover, 
"checkCounterArray", true);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (executor != null) {
+            executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void setLookupResult(String url, boolean available) {
+        LookupService lookup = mock(LookupService.class);
+        if (available) {
+            InetSocketAddress addr = 
InetSocketAddress.createUnresolved("broker", 6650);
+            when(lookup.getBroker(any()))
+                    .thenReturn(CompletableFuture.completedFuture(
+                            new LookupTopicResult(addr, addr, false)));
+        } else {
+            when(lookup.getBroker(any()))
+                    .thenReturn(FutureUtil.failedFuture(
+                            new RuntimeException("connection refused")));
+        }
+        when(mockClient.getLookup(url)).thenReturn(lookup);
+    }
+
+    /**
+     * Reproduces the race condition where findFailoverTo() skips over an 
unavailable service
+     * without marking it as Failed. This leaves a stale Healthy state that 
causes a spurious
+     * recovery bounce (0 -> 2 -> 1 -> 2) instead of a clean failover (0 -> 2).
+     *
+     * <p>Before the fix, after failover from index 0 to 2, state[1] remained 
Healthy (stale).
+     * On the next check cycle, firstHealthyPulsarService() would see 
state[1]=Healthy and
+     * immediately "recover" to index 1 — which is actually a broken service. 
This caused
+     * unnecessary bouncing and, combined with 3-second probe timeouts on dead 
services,
+     * could push the total failover time past the test's awaitility timeout.
+     */
+    @Test(timeOut = 30000)
+    public void testFindFailoverToMarksSkippedServicesAsFailed() throws 
Exception {
+        // url0 is down, url1 is down, url2 is healthy.
+        setLookupResult(URL0, false);
+        setLookupResult(URL1, false);
+        setLookupResult(URL2, true);
+
+        // Pre-set state[0] to Failed (as if checkPulsarServices already 
detected it),
+        // then run one check cycle. All on the executor to ensure thread 
safety.
+        runOnExecutor(() -> {
+            stateArray[0] = PulsarServiceState.Failed;
+            counterArray[0].setValue(0);
+        });
+        runCheckCycle();
+
+        // After the fix, findFailoverTo marks url1 as Failed when it fails 
probing.
+        // Verify on the executor thread where state is owned.
+        runOnExecutor(() -> {
+            assertEquals(failover.getCurrentPulsarServiceIndex(), 2,
+                    "Should have failed over to index 2");
+            assertEquals(stateArray[1], PulsarServiceState.Failed,
+                    "Service 1 should be marked Failed by findFailoverTo, not 
remain stale Healthy");
+            assertEquals(stateArray[2], PulsarServiceState.Healthy,
+                    "Service 2 should remain Healthy");
+        });
+    }
+
+    /**
+     * Verifies no spurious recovery bounce occurs after failover. Without the 
fix,
+     * the first check cycle after failover to index 2 would see stale Healthy 
state[1]
+     * and immediately switch to index 1.
+     */
+    @Test(timeOut = 30000)
+    public void testNoSpuriousRecoveryBounceAfterFailover() throws Exception {
+        // url0 is down, url1 is down, url2 is healthy.
+        setLookupResult(URL0, false);
+        setLookupResult(URL1, false);
+        setLookupResult(URL2, true);
+
+        // Pre-set state[0] to Failed.
+        runOnExecutor(() -> {
+            stateArray[0] = PulsarServiceState.Failed;
+            counterArray[0].setValue(0);
+        });
+
+        // Failover: 0 -> 2.
+        runCheckCycle();
+        runOnExecutor(() -> 
assertEquals(failover.getCurrentPulsarServiceIndex(), 2));
+
+        // Run another check cycle. Without the fix, state[1] would be stale 
Healthy,
+        // and firstHealthyPulsarService would return 1, causing a spurious 
switch.
+        runCheckCycle();
+        runOnExecutor(() -> 
assertEquals(failover.getCurrentPulsarServiceIndex(), 2,
+                "Should stay at index 2, not bounce to index 1"));
+    }
+
+    /**
+     * Verifies that recovery still works correctly for a service that was 
marked Failed
+     * by findFailoverTo, once that service becomes available again.
+     */
+    @Test(timeOut = 30000)
+    public void testRecoveryAfterFindFailoverToMarksServiceFailed() throws 
Exception {
+        // url0 is down, url1 is down, url2 is healthy.
+        setLookupResult(URL0, false);
+        setLookupResult(URL1, false);
+        setLookupResult(URL2, true);
+
+        // Pre-set state[0] to Failed and trigger failover 0 -> 2.
+        runOnExecutor(() -> {
+            stateArray[0] = PulsarServiceState.Failed;
+            counterArray[0].setValue(0);
+        });
+        runCheckCycle();
+        runOnExecutor(() -> {
+            assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
+            assertEquals(stateArray[1], PulsarServiceState.Failed);
+        });
+
+        // Now make url1 healthy (simulating recovery of that service).
+        setLookupResult(URL1, true);
+
+        // Run check cycles until service 1 recovers.
+        // Failed -> PreRecover (1 check) -> Healthy (recoverThreshold=2, so 1 
more check).
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            runCheckCycle();
+            runOnExecutor(() -> {
+                assertEquals(failover.getCurrentPulsarServiceIndex(), 1,
+                        "Should recover to index 1 after it becomes healthy");
+                assertEquals(stateArray[1], PulsarServiceState.Healthy);
+            });
+        });
+    }
+
+    private void runOnExecutor(Runnable task) throws Exception {
+        try {
+            executor.submit(task).get(5, TimeUnit.SECONDS);
+        } catch (java.util.concurrent.ExecutionException e) {
+            // Unwrap so that AssertionErrors propagate directly to Awaitility.
+            if (e.getCause() instanceof AssertionError) {
+                throw (AssertionError) e.getCause();
+            }
+            throw e;
+        }
+    }
+
+    private void runCheckCycle() throws Exception {
+        runOnExecutor(() -> {
+            try {
+                Method checkMethod = 
SameAuthParamsLookupAutoClusterFailover.class
+                        .getDeclaredMethod("checkPulsarServices");
+                checkMethod.setAccessible(true);
+                Method firstHealthyMethod = 
SameAuthParamsLookupAutoClusterFailover.class
+                        .getDeclaredMethod("firstHealthyPulsarService");
+                firstHealthyMethod.setAccessible(true);
+                Method findFailoverMethod = 
SameAuthParamsLookupAutoClusterFailover.class
+                        .getDeclaredMethod("findFailoverTo");
+                findFailoverMethod.setAccessible(true);
+                Method updateMethod = 
SameAuthParamsLookupAutoClusterFailover.class
+                        .getDeclaredMethod("updateServiceUrl", int.class);
+                updateMethod.setAccessible(true);
+
+                checkMethod.invoke(failover);
+                int firstHealthy = (int) firstHealthyMethod.invoke(failover);
+                int currentIndex = failover.getCurrentPulsarServiceIndex();
+                if (firstHealthy != currentIndex) {
+                    if (firstHealthy < 0) {
+                        int failoverTo = (int) 
findFailoverMethod.invoke(failover);
+                        if (failoverTo >= 0) {
+                            updateMethod.invoke(failover, failoverTo);
+                        }
+                    } else {
+                        updateMethod.invoke(failover, firstHealthy);
+                    }
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+}

Reply via email to