This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 37696d2e847 [fix][test] Fix flaky TopicFromMessageTest by using unique 
topic names (#25494)
37696d2e847 is described below

commit 37696d2e84771f379583068b4e3a59102c4fa024
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 8 20:41:57 2026 -0700

    [fix][test] Fix flaky TopicFromMessageTest by using unique topic names 
(#25494)
---
 .../pulsar/client/impl/TopicFromMessageTest.java   | 71 +++++++---------------
 1 file changed, 21 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
index 68f0f6efedb..e4caf980c80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
@@ -22,12 +22,9 @@ import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
-import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
@@ -36,38 +33,8 @@ public class TopicFromMessageTest extends 
SharedPulsarBaseTest {
     private static final long TEST_TIMEOUT = 90000; // 1.5 min
     private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2;
 
-    @Override
-    @BeforeClass(alwaysRun = true)
-    public void setupSharedCluster() throws Exception {
-        super.setupSharedCluster();
-        // These tests use short topic names (e.g. "topic1") which resolve to 
public/default
-        try {
-            admin.tenants().createTenant("public",
-                    new TenantInfoImpl(Set.of(), 
Set.of(SharedPulsarCluster.CLUSTER_NAME)));
-        } catch (Exception e) {
-            // tenant may already exist
-        }
-        try {
-            admin.namespaces().createNamespace("public/default",
-                    Set.of(SharedPulsarCluster.CLUSTER_NAME));
-        } catch (Exception e) {
-            // namespace may already exist
-        }
-    }
-
-    @Test(timeOut = TEST_TIMEOUT)
-    public void testSingleTopicConsumerNoBatchShortName() throws Exception {
-        try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("topic1").subscriptionName("sub1").subscribe();
-             Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("topic1").enableBatching(false).create()) {
-            producer.send("foobar".getBytes());
-            Assert.assertEquals(consumer.receive().getTopicName(), 
"persistent://public/default/topic1");
-        }
-    }
-
     @Test(timeOut = TEST_TIMEOUT)
-    public void testSingleTopicConsumerNoBatchFullName() throws Exception {
+    public void testSingleTopicConsumerNoBatch() throws Exception {
         final String topic = newTopicName();
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic).subscriptionName("sub1").subscribe();
@@ -79,17 +46,19 @@ public class TopicFromMessageTest extends 
SharedPulsarBaseTest {
     }
 
     @Test(timeOut = TEST_TIMEOUT)
-    public void testMultiTopicConsumerNoBatchShortName() throws Exception {
+    public void testMultiTopicConsumerNoBatch() throws Exception {
+        final String topic1 = newTopicName();
+        final String topic2 = newTopicName();
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topics(Lists.newArrayList("topic1", 
"topic2")).subscriptionName("sub1").subscribe();
+                .topics(Lists.newArrayList(topic1, 
topic2)).subscriptionName("sub1").subscribe();
              Producer<byte[]> producer1 = pulsarClient.newProducer()
-                .topic("topic1").enableBatching(false).create();
+                .topic(topic1).enableBatching(false).create();
              Producer<byte[]> producer2 = pulsarClient.newProducer()
-                .topic("topic2").enableBatching(false).create()) {
+                .topic(topic2).enableBatching(false).create()) {
 
             Set<String> topicSet = new HashSet<>();
-            topicSet.add("persistent://public/default/topic1");
-            topicSet.add("persistent://public/default/topic2");
+            topicSet.add(topic1);
+            topicSet.add(topic2);
             producer1.send("foobar".getBytes());
             producer2.send("foobar".getBytes());
             
Assert.assertTrue(topicSet.remove(consumer.receive().getTopicName()));
@@ -98,25 +67,28 @@ public class TopicFromMessageTest extends 
SharedPulsarBaseTest {
     }
 
     @Test(timeOut = TEST_TIMEOUT)
-    public void testSingleTopicConsumerBatchShortName() throws Exception {
+    public void testSingleTopicConsumerBatch() throws Exception {
+        final String topic = newTopicName();
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("topic1").subscriptionName("sub1").subscribe();
+                .topic(topic).subscriptionName("sub1").subscribe();
              Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create())
 {
+                
.topic(topic).enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create())
 {
             producer.send("foobar".getBytes());
 
-            Assert.assertEquals(consumer.receive().getTopicName(), 
"persistent://public/default/topic1");
+            Assert.assertEquals(consumer.receive().getTopicName(), topic);
         }
     }
 
     @Test(timeOut = TEST_TIMEOUT)
-    public void testMultiTopicConsumerBatchShortName() throws Exception {
+    public void testMultiTopicConsumerBatch() throws Exception {
+        final String topic1 = newTopicName();
+        final String topic2 = newTopicName();
         try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topics(Lists.newArrayList("topic1", 
"topic2")).subscriptionName("sub1").subscribe();
+                .topics(Lists.newArrayList(topic1, 
topic2)).subscriptionName("sub1").subscribe();
              Producer<byte[]> producer1 = pulsarClient.newProducer()
-                
.topic("topic1").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create();
+                
.topic(topic1).enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create();
              Producer<byte[]> producer2 = pulsarClient.newProducer()
-                
.topic("topic2").enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create())
 {
+                
.topic(topic2).enableBatching(true).batchingMaxMessages(BATCHING_MAX_MESSAGES_THRESHOLD).create())
 {
 
             producer1.send("foobar".getBytes());
             producer2.send("foobar".getBytes());
@@ -125,8 +97,7 @@ public class TopicFromMessageTest extends 
SharedPulsarBaseTest {
             String topicNameX = consumer.receive().getTopicName();
             String topicNameY = consumer.receive().getTopicName();
             Object[] actualTopicNames = new Object[]{topicNameX, topicNameY};
-            Object[] expectedTopicNames =
-                    new Object[]{"persistent://public/default/topic1", 
"persistent://public/default/topic2"};
+            Object[] expectedTopicNames = new Object[]{topic1, topic2};
             Assert.assertEqualsNoOrder(actualTopicNames, expectedTopicNames);
         }
     }

Reply via email to