This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 543a75d902e [fix][test] Fix flaky
ResendRequestTest.testFailoverSingleAckedNormalTopic (#25364)
543a75d902e is described below
commit 543a75d902ea09d2ec3b51e237a206fa71bda8ed
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 16:08:29 2026 -0700
[fix][test] Fix flaky ResendRequestTest.testFailoverSingleAckedNormalTopic
(#25364)
---
.../pulsar/broker/service/ResendRequestTest.java | 48 +++++++++++++++-------
1 file changed, 34 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 02caf35efa8..5bc3e7e9480 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -27,10 +27,12 @@ import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -242,22 +244,40 @@ public class ResendRequestTest extends
SharedPulsarBaseTest {
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
- // 2. Create consumer
+ // 2. Create consumers
+ // Use ConsumerEventListener to identify which consumer the dispatcher
picks as active.
+ // becameActive may be called once or twice (re-evaluation when
consumer-b subscribes).
+ // We track the latest active consumer and wait until both consumers
have been registered.
+ AtomicReference<Consumer<?>> activeConsumerRef = new
AtomicReference<>();
+ ConsumerEventListener listener = new ConsumerEventListener() {
+ @Override
+ public void becameActive(Consumer<?> consumer, int partitionId) {
+ activeConsumerRef.set(consumer);
+ }
+
+ @Override
+ public void becameInactive(Consumer<?> consumer, int partitionId) {
+ }
+ };
ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
- .acknowledgmentGroupTime(0, TimeUnit.SECONDS);
- Consumer<byte[]> consumer1 =
consumerBuilder.clone().consumerName("consumer-1").subscribe();
- Consumer<byte[]> consumer2 =
consumerBuilder.clone().consumerName("consumer-2").subscribe();
-
- // Wait for failover consumer assignment to settle so consumer-1 is
the active consumer
- Awaitility.await().untilAsserted(() -> {
- Subscription sub = topicRef.getSubscription(subscriptionName);
- assertNotNull(sub);
- AbstractDispatcherSingleActiveConsumer dispatcher =
- (AbstractDispatcherSingleActiveConsumer)
sub.getDispatcher();
- assertEquals(dispatcher.getConsumers().size(), 2);
- assertEquals(dispatcher.getActiveConsumer().consumerName(),
"consumer-1");
- });
+ .acknowledgmentGroupTime(0,
TimeUnit.SECONDS).consumerEventListener(listener);
+ Consumer<byte[]> consumerA =
consumerBuilder.clone().consumerName("consumer-a").subscribe();
+ Consumer<byte[]> consumerB =
consumerBuilder.clone().consumerName("consumer-b").subscribe();
+
+ // Wait until the active consumer is assigned (becameActive has been
called at least once)
+ Awaitility.await().untilAsserted(() ->
assertNotNull(activeConsumerRef.get()));
+
+ Consumer<?> activeConsumer = activeConsumerRef.get();
+ Consumer<byte[]> consumer1; // active
+ Consumer<byte[]> consumer2; // standby
+ if (activeConsumer == consumerA) {
+ consumer1 = consumerA;
+ consumer2 = consumerB;
+ } else {
+ consumer1 = consumerB;
+ consumer2 = consumerA;
+ }
// 3. Producer publishes messages
for (int i = 0; i < totalMessages; i++) {