This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 543a75d902e [fix][test] Fix flaky 
ResendRequestTest.testFailoverSingleAckedNormalTopic (#25364)
543a75d902e is described below

commit 543a75d902ea09d2ec3b51e237a206fa71bda8ed
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 16:08:29 2026 -0700

    [fix][test] Fix flaky ResendRequestTest.testFailoverSingleAckedNormalTopic 
(#25364)
---
 .../pulsar/broker/service/ResendRequestTest.java   | 48 +++++++++++++++-------
 1 file changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 02caf35efa8..5bc3e7e9480 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -27,10 +27,12 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -242,22 +244,40 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
-        // 2. Create consumer
+        // 2. Create consumers
+        // Use ConsumerEventListener to identify which consumer the dispatcher 
picks as active.
+        // becameActive may be called once or twice (re-evaluation when 
consumer-b subscribes).
+        // We track the latest active consumer and wait until both consumers 
have been registered.
+        AtomicReference<Consumer<?>> activeConsumerRef = new 
AtomicReference<>();
+        ConsumerEventListener listener = new ConsumerEventListener() {
+            @Override
+            public void becameActive(Consumer<?> consumer, int partitionId) {
+                activeConsumerRef.set(consumer);
+            }
+
+            @Override
+            public void becameInactive(Consumer<?> consumer, int partitionId) {
+            }
+        };
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
-                .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
-        Consumer<byte[]> consumer1 = 
consumerBuilder.clone().consumerName("consumer-1").subscribe();
-        Consumer<byte[]> consumer2 = 
consumerBuilder.clone().consumerName("consumer-2").subscribe();
-
-        // Wait for failover consumer assignment to settle so consumer-1 is 
the active consumer
-        Awaitility.await().untilAsserted(() -> {
-            Subscription sub = topicRef.getSubscription(subscriptionName);
-            assertNotNull(sub);
-            AbstractDispatcherSingleActiveConsumer dispatcher =
-                    (AbstractDispatcherSingleActiveConsumer) 
sub.getDispatcher();
-            assertEquals(dispatcher.getConsumers().size(), 2);
-            assertEquals(dispatcher.getActiveConsumer().consumerName(), 
"consumer-1");
-        });
+                .acknowledgmentGroupTime(0, 
TimeUnit.SECONDS).consumerEventListener(listener);
+        Consumer<byte[]> consumerA = 
consumerBuilder.clone().consumerName("consumer-a").subscribe();
+        Consumer<byte[]> consumerB = 
consumerBuilder.clone().consumerName("consumer-b").subscribe();
+
+        // Wait until the active consumer is assigned (becameActive has been 
called at least once)
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(activeConsumerRef.get()));
+
+        Consumer<?> activeConsumer = activeConsumerRef.get();
+        Consumer<byte[]> consumer1;  // active
+        Consumer<byte[]> consumer2;  // standby
+        if (activeConsumer == consumerA) {
+            consumer1 = consumerA;
+            consumer2 = consumerB;
+        } else {
+            consumer1 = consumerB;
+            consumer2 = consumerA;
+        }
 
         // 3. Producer publishes messages
         for (int i = 0; i < totalMessages; i++) {

Reply via email to