This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0163ce8677b [cleanup] Convert 12 test classes to SharedPulsarBaseTest 
(#25392)
0163ce8677b is described below

commit 0163ce8677bbb72ec959052d55d0ce43918ca469
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Mar 24 08:05:06 2026 -0700

    [cleanup] Convert 12 test classes to SharedPulsarBaseTest (#25392)
---
 .../broker/admin/MaxUnackedMessagesTest.java       | 213 ++++++++++-----------
 .../RGUsageMTAggrWaitForAllMsgsTest.java           |  44 ++---
 .../ResourceGroupUsageAggregationTest.java         |  44 ++---
 .../broker/service/schema/ClientGetSchemaTest.java | 149 +++++---------
 .../stats/BookieClientsStatsGeneratorTest.java     |  22 +--
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 185 ++++++++----------
 .../pulsar/client/api/DeadLetterTopicTest.java     | 148 +++++++-------
 .../client/api/PatternMultiTopicsConsumerTest.java |  40 ++--
 .../pulsar/client/api/UnloadSubscriptionTest.java  | 134 ++++++-------
 .../apache/pulsar/client/impl/MessageIdTest.java   |  22 +--
 .../PerMessageUnAcknowledgedRedeliveryTest.java    |  28 +--
 .../pulsar/client/impl/ProducerCloseTest.java      |  58 ++----
 .../pulsar/client/impl/TopicDoesNotExistsTest.java |  50 ++---
 .../processor/MessagePayloadProcessorTest.java     |  36 +---
 14 files changed, 479 insertions(+), 694 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
index 9a8e8fa5a50..7a67fb22198 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -27,50 +27,29 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
-public class MaxUnackedMessagesTest extends ProducerConsumerBase {
-    private final String testTenant = "my-property";
-    private final String testNamespace = "my-ns";
-    private final String myNamespace = testTenant + "/" + testNamespace;
-    private final String testTopic = "persistent://" + myNamespace + 
"/max-unacked-";
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class MaxUnackedMessagesTest extends SharedPulsarBaseTest {
 
     @Test(timeOut = 10000)
     public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
-        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
         waitCacheInit(topicName);
         Integer max = 
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
@@ -89,97 +68,99 @@ public class MaxUnackedMessagesTest extends 
ProducerConsumerBase {
     // See https://github.com/apache/pulsar/issues/5438
     @Test(timeOut = 20000)
     public void testMaxUnackedMessagesOnSubscription() throws Exception {
-        final String topicName = testTopic + System.currentTimeMillis();
-        final String subscriberName = "test-sub" + System.currentTimeMillis();
+        final String topicName = newTopicName();
+        final String subscriberName = "test-sub";
         final int unackMsgAllowed = 100;
         final int receiverQueueSize = 10;
         final int totalProducedMsgs = 200;
 
-        
pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
-        ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
-                
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
-                .subscriptionType(SubscriptionType.Shared);
-        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
-        Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
-        List<Consumer<?>> consumers = List.of(consumer1, consumer2, consumer3);
-        waitCacheInit(topicName);
-        admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 
unackMsgAllowed);
-        Awaitility.await().untilAsserted(()
-                -> 
assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        int origValue = getConfig().getMaxUnackedMessagesPerSubscription();
+        getConfig().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
+        try {
+            ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
+                    
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
+                    .subscriptionType(SubscriptionType.Shared);
+            Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+            Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+            Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+            List<Consumer<?>> consumers = List.of(consumer1, consumer2, 
consumer3);
+            waitCacheInit(topicName);
+            admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 
unackMsgAllowed);
+            Awaitility.await().untilAsserted(()
+                    -> 
assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
+            Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
-        // (1) Produced Messages
-        for (int i = 0; i < totalProducedMsgs; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
-        }
+            // (1) Produced Messages
+            for (int i = 0; i < totalProducedMsgs; i++) {
+                String message = "my-message-" + i;
+                producer.send(message.getBytes());
+            }
 
-        // (2) try to consume messages: but will be able to consume number of 
messages = unackMsgAllowed
-        Message<?> msg = null;
-        Map<Message<?>, Consumer<?>> messages = new HashMap<>();
-        for (int i = 0; i < 3; i++) {
-            for (int j = 0; j < totalProducedMsgs; j++) {
-                msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    messages.put(msg, consumers.get(i));
-                } else {
-                    break;
+            // (2) try to consume messages: but will be able to consume number 
of messages = unackMsgAllowed
+            Message<?> msg = null;
+            Map<Message<?>, Consumer<?>> messages = new HashMap<>();
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < totalProducedMsgs; j++) {
+                    msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        messages.put(msg, consumers.get(i));
+                    } else {
+                        break;
+                    }
                 }
             }
-        }
 
-        // client must receive number of messages = unAckedMessagesBufferSize 
rather all produced messages: check
-        // delta as 3 consumers with receiverQueueSize = 10
-        assertEquals(unackMsgAllowed, messages.size(), receiverQueueSize * 3);
+            // client must receive number of messages = 
unAckedMessagesBufferSize rather all produced messages
+            assertEquals(unackMsgAllowed, messages.size(), receiverQueueSize * 
3);
 
-        // start acknowledging messages
-        messages.forEach((m, c) -> {
-            try {
-                c.acknowledge(m);
-            } catch (PulsarClientException e) {
-                fail("ack failed", e);
-            }
-        });
+            // start acknowledging messages
+            messages.forEach((m, c) -> {
+                try {
+                    c.acknowledge(m);
+                } catch (PulsarClientException e) {
+                    fail("ack failed", e);
+                }
+            });
 
-        // try to consume remaining messages: broker may take time to deliver 
so, retry multiple time to consume
-        // all messages
-        Set<MessageId> result = ConcurrentHashMap.newKeySet();
-        // expecting messages which are not received
-        int expectedRemainingMessages = totalProducedMsgs - messages.size();
-        CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
-        for (int i = 0; i < consumers.size(); i++) {
-            final int consumerCount = i;
-            for (int j = 0; j < totalProducedMsgs; j++) {
-                consumers.get(i).receiveAsync().thenAccept(m -> {
-                    result.add(m.getMessageId());
-                    try {
-                        consumers.get(consumerCount).acknowledge(m);
-                    } catch (PulsarClientException e) {
-                        fail("failed to ack msg", e);
-                    }
-                    latch.countDown();
-                });
+            // try to consume remaining messages
+            Set<MessageId> result = ConcurrentHashMap.newKeySet();
+            int expectedRemainingMessages = totalProducedMsgs - 
messages.size();
+            CountDownLatch latch = new 
CountDownLatch(expectedRemainingMessages);
+            for (int i = 0; i < consumers.size(); i++) {
+                final int consumerCount = i;
+                for (int j = 0; j < totalProducedMsgs; j++) {
+                    consumers.get(i).receiveAsync().thenAccept(m -> {
+                        result.add(m.getMessageId());
+                        try {
+                            consumers.get(consumerCount).acknowledge(m);
+                        } catch (PulsarClientException e) {
+                            fail("failed to ack msg", e);
+                        }
+                        latch.countDown();
+                    });
+                }
             }
-        }
 
-        latch.await(10, TimeUnit.SECONDS);
+            latch.await(10, TimeUnit.SECONDS);
 
-        // total received-messages should match to produced messages (it may 
have duplicate messages)
-        assertEquals(result.size(), expectedRemainingMessages);
+            // total received-messages should match to produced messages (it 
may have duplicate messages)
+            assertEquals(result.size(), expectedRemainingMessages);
 
-        producer.close();
-        consumers.forEach(c -> {
-            try {
-                c.close();
-            } catch (PulsarClientException e) {
-            }
-        });
+            producer.close();
+            consumers.forEach(c -> {
+                try {
+                    c.close();
+                } catch (PulsarClientException e) {
+                }
+            });
+        } finally {
+            getConfig().setMaxUnackedMessagesPerSubscription(origValue);
+        }
     }
 
     @Test(timeOut = 20000)
     public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
-        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
         waitCacheInit(topicName);
         Integer max = 
admin.topics().getMaxUnackedMessagesOnConsumer(topicName);
@@ -197,22 +178,22 @@ public class MaxUnackedMessagesTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testMaxUnackedMessagesOnConsumerAppliedApi() throws Exception {
-        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
         waitCacheInit(topicName);
         Integer max = 
admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
-        assertEquals(max.intValue(), 
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer());
+        assertEquals(max.intValue(), 
getConfig().getMaxUnackedMessagesPerConsumer());
 
-        admin.namespaces().setMaxUnackedMessagesPerConsumer(myNamespace, 15);
+        admin.namespaces().setMaxUnackedMessagesPerConsumer(getNamespace(), 
15);
         Awaitility.await().untilAsserted(()
-                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace).intValue(),
 15));
-        admin.namespaces().removeMaxUnackedMessagesPerConsumer(myNamespace);
+                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace()).intValue(),
 15));
+        admin.namespaces().removeMaxUnackedMessagesPerConsumer(getNamespace());
         Awaitility.await().untilAsserted(()
-                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace), 
null));
+                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace()),
 null));
 
-        admin.namespaces().setMaxUnackedMessagesPerConsumer(myNamespace, 10);
+        admin.namespaces().setMaxUnackedMessagesPerConsumer(getNamespace(), 
10);
         Awaitility.await().untilAsserted(()
-                -> 
assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace)));
+                -> 
assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace())));
         max = admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
         assertEquals(max.intValue(), 10);
 
@@ -225,36 +206,37 @@ public class MaxUnackedMessagesTest extends 
ProducerConsumerBase {
 
     @Test
     public void testMaxUnackedMessagesOnSubApplied() throws Exception {
-        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String topicName = newTopicName();
         waitCacheInit(topicName);
-        
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace));
+        
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()));
         
assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
         
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName, 
true),
-                Integer.valueOf(conf.getMaxUnackedMessagesPerSubscription()));
+                
Integer.valueOf(getConfig().getMaxUnackedMessagesPerSubscription()));
 
-        admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, 
10);
+        
admin.namespaces().setMaxUnackedMessagesPerSubscription(getNamespace(), 10);
         Awaitility.await().untilAsserted(()
-                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace),
+                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()),
                 Integer.valueOf(10)));
 
         admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 20);
         Awaitility.await().untilAsserted(()
-                -> 
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName), 
Integer.valueOf(20)));
+                -> 
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName),
+                Integer.valueOf(20)));
 
         admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
         Awaitility.await().untilAsserted(()
-                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace),
+                -> 
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()),
                 Integer.valueOf(10)));
 
-        
admin.namespaces().removeMaxUnackedMessagesPerSubscription(myNamespace);
+        
admin.namespaces().removeMaxUnackedMessagesPerSubscription(getNamespace());
         
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName, 
true),
-                Integer.valueOf(conf.getMaxUnackedMessagesPerSubscription()));
+                
Integer.valueOf(getConfig().getMaxUnackedMessagesPerSubscription()));
     }
 
     @Test(timeOut = 30000)
     public void testMaxUnackedMessagesOnConsumer() throws Exception {
-        final String topicName = testTopic + System.currentTimeMillis();
-        final String subscriberName = "test-sub" + System.currentTimeMillis();
+        final String topicName = newTopicName();
+        final String subscriberName = "test-sub";
         final int unackMsgAllowed = 100;
         final int receiverQueueSize = 10;
         final int totalProducedMsgs = 300;
@@ -335,6 +317,5 @@ public class MaxUnackedMessagesTest extends 
ProducerConsumerBase {
 
     private void waitCacheInit(String topicName) throws Exception {
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
-        TopicName topic = TopicName.get(topicName);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 2b45d955926..094a5591831 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -25,28 +25,26 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
@@ -57,11 +55,10 @@ import org.testng.annotations.Test;
 // are verified on the RGs.
 @Slf4j
 @Test(groups = "flaky")
-public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
+public class RGUsageMTAggrWaitForAllMsgsTest extends SharedPulsarBaseTest {
+    @org.testng.annotations.BeforeClass(alwaysRun = true)
+    public void setupRG() throws Exception {
+        PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
         this.prepareForOps();
 
         ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() 
{
@@ -87,12 +84,6 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
         Thread.sleep(2000);
     }
 
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test
     public void testMTProduceConsumeRGUsagePersistentTopicNamesSameTenant() 
throws Exception {
         testProduceConsumeUsageOnRG(persistentTopicNamesSameTenantAndNsRGs);
@@ -547,7 +538,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
                                        int scaleFactor, boolean checkProduce,
                                        boolean checkConsume) throws Exception {
 
-        BrokerService bs = pulsar.getBrokerService();
+        BrokerService bs = 
SharedPulsarCluster.get().getPulsarService().getBrokerService();
         Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
 
         log.debug("verifyProdConsStats: topicStatsMap has {} entries", 
topicStatsMap.size());
@@ -779,7 +770,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
             new org.apache.pulsar.common.policies.data.ResourceGroup();
     private ResourceGroupService rgservice;
 
-    private final String clusterName = "test";
+    private final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
     private static final String BaseRGName = "rg-";
     private static final String BaseTestTopicName = "rgusage-topic-";
 
@@ -837,8 +828,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
     long residualRecvdNumMessages;
 
     // Create the topics provided
-    private void createTopics(String[] topics) {
-        BrokerService bs = this.pulsar.getBrokerService();
+    private void createTopics(String[] topics) throws Exception {
+        BrokerService bs = 
SharedPulsarCluster.get().getPulsarService().getBrokerService();
         for (String topic : topics) {
             if (!createdTopics.contains(topic)) {
                 bs.getOrCreateTopic(topic);
@@ -848,8 +839,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
     }
 
     // Destroy the topics provided
-    private void destroyTopics(String[] topics) {
-        BrokerService bs = this.pulsar.getBrokerService();
+    private void destroyTopics(String[] topics) throws Exception {
+        BrokerService bs = 
SharedPulsarCluster.get().getPulsarService().getBrokerService();
         for (String topic : topics) {
             if (!createdTopics.contains(topic)) {
                 bs.deleteTopic(topic, true);
@@ -873,10 +864,11 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends 
ProducerConsumerBase {
     }
 
     // Initial set up for transport manager and cluster creation.
-    private void prepareForOps() throws PulsarAdminException {
-        
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
-        this.conf.setAllowAutoTopicCreation(true);
-        admin.clusters().createCluster(clusterName, 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+    private void prepareForOps() throws Exception {
+        SharedPulsarCluster.get().getPulsarService().getConfiguration()
+                
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+        
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
+        // Cluster already created by SharedPulsarCluster
     }
 
     // Set up of RG/tenant/namespaces/topic names, and checking of the test 
parameters.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index ef20ac9d8f0..9c5aa8d7487 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -23,37 +23,34 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
+public class ResourceGroupUsageAggregationTest extends SharedPulsarBaseTest {
+    @org.testng.annotations.BeforeClass(alwaysRun = true)
+    public void setupRG() throws Exception {
+        PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
         this.prepareData();
 
         ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() 
{
@@ -74,12 +71,6 @@ public class ResourceGroupUsageAggregationTest extends 
ProducerConsumerBase {
         this.rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, 
transportMgr, dummyQuotaCalc);
     }
 
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test
     public void testProduceConsumeUsageOnRG() throws Exception {
         testProduceConsumeUsageOnRG(produceConsumePersistentTopic);
@@ -183,7 +174,8 @@ public class ResourceGroupUsageAggregationTest extends 
ProducerConsumerBase {
 
         consumer.close();
         // cleanup the topic data.
-        CompletableFuture<Optional<Topic>> topicFuture = 
pulsar.getBrokerService().getTopics().remove(topicString);
+        CompletableFuture<Optional<Topic>> topicFuture =
+                
SharedPulsarCluster.get().getPulsarService().getBrokerService().getTopics().remove(topicString);
         if (topicFuture != null) {
             Optional<Topic> optTopic = topicFuture.join();
             if (optTopic.isPresent()) {
@@ -210,8 +202,8 @@ public class ResourceGroupUsageAggregationTest extends 
ProducerConsumerBase {
                              int sentNumBytes, int sentNumMsgs,
                              int recvdNumBytes, int recvdNumMsgs,
                              boolean checkProduce, boolean checkConsume)
-                                                                throws 
InterruptedException, PulsarAdminException {
-        BrokerService bs = pulsar.getBrokerService();
+                                                                throws 
Exception {
+        BrokerService bs = 
SharedPulsarCluster.get().getPulsarService().getBrokerService();
         Awaitility.await().untilAsserted(() -> {
             TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
             Assert.assertNotNull(topicStats);
@@ -272,15 +264,15 @@ public class ResourceGroupUsageAggregationTest extends 
ProducerConsumerBase {
     private static final int PUBLISH_INTERVAL_SECS = 300;
 
     // Initial set up for transport manager and producer/consumer 
clusters/tenants/namespaces/topics.
-    private void prepareData() throws PulsarAdminException {
-        
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+    private void prepareData() throws Exception {
+        SharedPulsarCluster.get().getPulsarService().getConfiguration()
+                
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
 
-        this.conf.setAllowAutoTopicCreation(true);
+        
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
 
-        final String clusterName = "test";
-        admin.clusters().createCluster(clusterName, 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
-            admin.tenants().createTenant(tenantName,
-                    new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), 
Sets.newHashSet(clusterName)));
+        final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
+        admin.tenants().createTenant(tenantName,
+                new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), 
Sets.newHashSet(clusterName)));
         admin.namespaces().createNamespace(tenantAndNsName);
         admin.namespaces().setNamespaceReplicationClusters(tenantAndNsName, 
Sets.newHashSet(clusterName), false);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index f38871f71b3..c9540f06037 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -18,89 +18,37 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static 
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 import java.util.function.Supplier;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.schema.Schemas;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class ClientGetSchemaTest extends ProducerConsumerBase {
-
-    private static final String topicBytes = "my-property/my-ns/topic-bytes";
-    private static final String topicString = "my-property/my-ns/topic-string";
-    private static final String topicJson = "my-property/my-ns/topic-json";
-    private static final String topicAvro = "my-property/my-ns/topic-avro";
-    private static final String topicJsonNotNull = 
"my-property/my-ns/topic-json-not-null";
-    private static final String topicAvroNotNull = 
"my-property/my-ns/topic-avro-not-null";
-
-    List<Producer<?>> producers = new ArrayList<>();
+public class ClientGetSchemaTest extends SharedPulsarBaseTest {
 
     private static class MyClass {
         public String name;
         public int age;
     }
 
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-
-        // Create few topics with different types
-        
producers.add(pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create());
-        
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
-        
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
-        
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
-        
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder()
-                .withPojo(MyClass.class).build())).topic(topicAvro).create());
-        
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder()
-                .withPojo(MyClass.class).build())).topic(topicJson).create());
-        
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder()
-                
.withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
-        
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder()
-                
.withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());
-
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        producers.forEach(t -> {
-            try {
-                t.close();
-            } catch (PulsarClientException e) {
-            }
-        });
-        super.internalCleanup();
-    }
-
     @DataProvider(name = "serviceUrl")
     public Object[] serviceUrls() {
         return new Object[] {
-                stringSupplier(() -> getPulsar().getBrokerServiceUrl()),
-                stringSupplier(() -> getPulsar().getWebServiceAddress())
+                stringSupplier(() -> getBrokerServiceUrl()),
+                stringSupplier(() -> getWebServiceUrl())
         };
     }
 
@@ -110,6 +58,19 @@ public class ClientGetSchemaTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "serviceUrl")
     public void testGetSchema(Supplier<String> serviceUrl) throws Exception {
+        String topicBytes = newTopicName();
+        String topicString = newTopicName();
+        String topicJson = newTopicName();
+        String topicAvro = newTopicName();
+
+        // Create topics with different schema types
+        @Cleanup Producer<byte[]> pBytes = 
pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create();
+        @Cleanup Producer<String> pString = 
pulsarClient.newProducer(Schema.STRING).topic(topicString).create();
+        @Cleanup Producer<MyClass> pAvro =
+                
pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create();
+        @Cleanup Producer<MyClass> pJson =
+                
pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create();
+
         @Cleanup
         PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
 
@@ -123,51 +84,44 @@ public class ClientGetSchemaTest extends 
ProducerConsumerBase {
     /**
      * It validates if schema ledger is deleted or non recoverable then it 
will clean up schema storage for the topic
      * and make the topic available.
-     *
-     * @throws Exception
      */
     @Test
     public void testSchemaFailure() throws Exception {
-        final String tenant = PUBLIC_TENANT;
-        final String namespace = "test-namespace-" + randomName(16);
-        final String topicOne = "test-broken-schema-storage";
-        final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), 
tenant, namespace, topicOne).toString();
-
-        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet("test"));
+        final String topicOne = newTopicName();
 
         // (1) create topic with schema
         Producer<Schemas.PersonTwo> producer = pulsarClient
-                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
                         
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
-                .topic(fqtnOne).create();
+                .topic(topicOne).create();
 
         producer.close();
 
-        String key = TopicName.get(fqtnOne).getSchemaName();
-        BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) 
pulsar.getSchemaStorage();
-        long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+        String key = TopicName.get(topicOne).getSchemaName();
+        BookkeeperSchemaStorage schemaStorage = (BookkeeperSchemaStorage) 
getSchemaStorage();
+        long schemaLedgerId = schemaStorage.getSchemaLedgerList(key).get(0);
 
         // (2) break schema locator by deleting schema-ledger
-        schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+        schemaStorage.getBookKeeper().deleteLedger(schemaLedgerId);
 
-        admin.topics().unload(fqtnOne);
+        admin.topics().unload(topicOne);
 
         // (3) create topic again: broker should handle broken schema and load 
the topic successfully
         producer = pulsarClient
-                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
                         
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
-                .topic(fqtnOne).create();
+                .topic(topicOne).create();
 
-        assertNotEquals(schemaLedgerId, 
schemaStrogate.getSchemaLedgerList(key).get(0));
+        assertNotEquals(schemaLedgerId, 
schemaStorage.getSchemaLedgerList(key).get(0));
 
         Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
         personTwo.setId(1);
         personTwo.setName("Tom");
 
         Consumer<Schemas.PersonTwo> consumer = pulsarClient
-                .newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
+                
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
                         
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
-                .subscriptionName("test").topic(fqtnOne).subscribe();
+                .subscriptionName("test").topic(topicOne).subscribe();
 
         producer.send(personTwo);
 
@@ -181,33 +135,34 @@ public class ClientGetSchemaTest extends 
ProducerConsumerBase {
 
     @Test
     public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception {
-        final String tenant = PUBLIC_TENANT;
-        final String namespace = "test-namespace-" + randomName(16);
-        final String topicOne = "test-deleted-schema-ledger";
-        final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), 
tenant, namespace, topicOne).toString();
+        final String topicOne = newTopicName();
 
-        pulsar.getConfig().setSchemaLedgerForceRecovery(true);
-        admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet("test"));
+        boolean origValue = getConfig().isSchemaLedgerForceRecovery();
+        getConfig().setSchemaLedgerForceRecovery(true);
+        try {
+            // (1) create topic with schema
+            Producer<Schemas.PersonTwo> producer = pulsarClient
+                    
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
+                            
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+                    .topic(topicOne).create();
 
-        // (1) create topic with schema
-        Producer<Schemas.PersonTwo> producer = pulsarClient
-                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
-                        
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
-                .topic(fqtnOne).create();
+            producer.close();
 
-        producer.close();
-
-        String key = TopicName.get(fqtnOne).getSchemaName();
-        BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage) 
pulsar.getSchemaStorage();
-        long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+            String key = TopicName.get(topicOne).getSchemaName();
+            BookkeeperSchemaStorage schemaStorage = (BookkeeperSchemaStorage) 
getSchemaStorage();
+            long schemaLedgerId = 
schemaStorage.getSchemaLedgerList(key).get(0);
 
-        // (2) break schema locator by deleting schema-ledger
-        schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+            // (2) break schema locator by deleting schema-ledger
+            schemaStorage.getBookKeeper().deleteLedger(schemaLedgerId);
 
-        admin.topics().unload(fqtnOne);
+            admin.topics().unload(topicOne);
 
-        Producer<byte[]> producerWihtoutSchema = 
pulsarClient.newProducer().topic(fqtnOne).create();
+            @Cleanup
+            Producer<byte[]> producerWithoutSchema = 
pulsarClient.newProducer().topic(topicOne).create();
 
-        assertNotNull(producerWihtoutSchema);
+            assertNotNull(producerWithoutSchema);
+        } finally {
+            getConfig().setSchemaLedgerForceRecovery(origValue);
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
index 9974b834621..d5924ecb510 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
@@ -24,31 +24,19 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.common.stats.JvmMetrics;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class BookieClientsStatsGeneratorTest extends BrokerTestBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class BookieClientsStatsGeneratorTest extends SharedPulsarBaseTest {
 
     @Test
     public void testBookieClientStatsGenerator() throws Exception {
         // should not generate any NPE or other exceptions..
-        Map<String, Map<String, PendingBookieOpsStats>> stats = 
BookieClientStatsGenerator.generate(super.getPulsar());
+        Map<String, Map<String, PendingBookieOpsStats>> stats =
+                
BookieClientStatsGenerator.generate(SharedPulsarCluster.get().getPulsarService());
         assertTrue(stats.isEmpty());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index ae0298ade3d..c186dd1f032 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -36,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -43,30 +43,14 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class ConsumerRedeliveryTest extends ProducerConsumerBase {
+public class ConsumerRedeliveryTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerRedeliveryTest.class);
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        conf.setManagedLedgerCacheEvictionIntervalMs(10000);
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @DataProvider(name = "ackReceiptEnabled")
     public Object[][] ackReceiptEnabled() {
         return new Object[][] { { true }, { false } };
@@ -97,76 +81,81 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
      */
     @Test(dataProvider = "ackReceiptEnabled")
     public void testOrderedRedelivery(boolean ackReceiptEnabled) throws 
Exception {
-        String topic = "persistent://my-property/my-ns/redelivery-" + 
System.currentTimeMillis();
-
-        conf.setManagedLedgerMaxEntriesPerLedger(2);
-        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
-
-        @Cleanup
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .producerName("my-producer-name")
-                .create();
-        ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
-                .subscriptionType(SubscriptionType.Shared)
-                .isAckReceiptEnabled(ackReceiptEnabled);
-        ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
consumerBuilder.subscribe();
-
-        final int totalMsgs = 100;
-
-        for (int i = 0; i < totalMsgs; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
-        }
-
+        String topic = newTopicName();
+
+        int origMaxEntries = getConfig().getManagedLedgerMaxEntriesPerLedger();
+        int origMinRollover = 
getConfig().getManagedLedgerMinLedgerRolloverTimeMinutes();
+        getConfig().setManagedLedgerMaxEntriesPerLedger(2);
+        getConfig().setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        try {
+            @Cleanup
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                    .topic(topic)
+                    .producerName("my-producer-name")
+                    .create();
+            ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
+                    .subscriptionType(SubscriptionType.Shared)
+                    .isAckReceiptEnabled(ackReceiptEnabled);
+            ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
consumerBuilder.subscribe();
+
+            final int totalMsgs = 100;
+
+            for (int i = 0; i < totalMsgs; i++) {
+                String message = "my-message-" + i;
+                producer.send(message.getBytes());
+            }
 
-        int consumedCount = 0;
-        Set<MessageId> messageIds = new HashSet<>();
-        for (int i = 0; i < totalMsgs; i++) {
-            Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
-            if (message != null && (consumedCount % 2) == 0) {
-                consumer1.acknowledge(message);
-            } else {
-                messageIds.add(message.getMessageId());
+            int consumedCount = 0;
+            Set<MessageId> messageIds = new HashSet<>();
+            for (int i = 0; i < totalMsgs; i++) {
+                Message<byte[]> message = consumer1.receive(5, 
TimeUnit.SECONDS);
+                if (message != null && (consumedCount % 2) == 0) {
+                    consumer1.acknowledge(message);
+                } else {
+                    messageIds.add(message.getMessageId());
+                }
+                consumedCount += 1;
             }
-            consumedCount += 1;
-        }
-        assertEquals(totalMsgs, consumedCount);
-
-        // redeliver all unack messages
-        consumer1.redeliverUnacknowledgedMessages(messageIds);
-
-        MessageIdImpl lastMsgId = null;
-        for (int i = 0; i < totalMsgs / 2; i++) {
-            Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
-            MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
-            if (lastMsgId != null) {
-                assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId(), 
"lastMsgId: "
-                        + lastMsgId + " -- msgId: " + msgId);
+            assertEquals(totalMsgs, consumedCount);
+
+            // redeliver all unack messages
+            consumer1.redeliverUnacknowledgedMessages(messageIds);
+
+            MessageIdImpl lastMsgId = null;
+            for (int i = 0; i < totalMsgs / 2; i++) {
+                Message<byte[]> message = consumer1.receive(5, 
TimeUnit.SECONDS);
+                MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
+                if (lastMsgId != null) {
+                    assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId(), 
"lastMsgId: "
+                            + lastMsgId + " -- msgId: " + msgId);
+                }
+                lastMsgId = msgId;
             }
-            lastMsgId = msgId;
-        }
 
-        // close consumer so, this consumer's unack messages will be 
redelivered to new consumer
-        consumer1.close();
-
-        @Cleanup
-        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
-        lastMsgId = null;
-        for (int i = 0; i < totalMsgs / 2; i++) {
-            Message<byte[]> message = consumer2.receive(5, TimeUnit.SECONDS);
-            MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
-            if (lastMsgId != null) {
-                assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
+            // close consumer so, this consumer's unack messages will be 
redelivered to new consumer
+            consumer1.close();
+
+            @Cleanup
+            Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+            lastMsgId = null;
+            for (int i = 0; i < totalMsgs / 2; i++) {
+                Message<byte[]> message = consumer2.receive(5, 
TimeUnit.SECONDS);
+                MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
+                if (lastMsgId != null) {
+                    assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
+                }
+                lastMsgId = msgId;
             }
-            lastMsgId = msgId;
+        } finally {
+            getConfig().setManagedLedgerMaxEntriesPerLedger(origMaxEntries);
+            
getConfig().setManagedLedgerMinLedgerRolloverTimeMinutes(origMinRollover);
         }
     }
 
     @Test(dataProvider = "ackReceiptEnabled")
     public void testUnAckMessageRedeliveryWithReceiveAsync(boolean 
ackReceiptEnabled)
             throws PulsarClientException, ExecutionException, 
InterruptedException {
-        String topic = "persistent://my-property/my-ns/async-unack-redelivery";
+        String topic = newTopicName();
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("s1")
@@ -222,12 +211,10 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
     @Test
     public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
 
-        log.info("-- Starting {} test --", methodName);
-
         final int queueSize = 10;
         int batchSize = 100;
         String subName = "my-subscriber-name";
-        String topicName = "permitReceiveBatchMessages" + 
(UUID.randomUUID().toString());
+        String topicName = newTopicName();
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
                 .subscribe();
@@ -251,27 +238,22 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
         }
         producer.flush();
 
-        retryStrategically((test) -> {
-            return consumer1.getTotalIncomingMessages() == batchSize;
-        }, 5, 2000);
-
-        assertEquals(consumer1.getTotalIncomingMessages(), batchSize);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(consumer1.getTotalIncomingMessages(), batchSize));
 
         ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
                 .subscribe();
 
-        retryStrategically((test) -> {
-            return consumer2.getTotalIncomingMessages() == queueSize;
-        }, 5, 2000);
-        assertEquals(consumer2.getTotalIncomingMessages(), queueSize);
-        log.info("-- Exiting {} test --", methodName);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(consumer2.getTotalIncomingMessages(), queueSize));
     }
 
     @Test(timeOut = 30000)
     public void testMessageRedeliveryWhenTimeoutInListener() throws Exception {
+
         final String subName = 
"sub_testMessageRedeliveryWhenTimeoutInListener";
-        final String topicName = "testMessageRedeliveryWhenTimeoutInListener" 
+ UUID.randomUUID();
+        final String topicName = newTopicName();
 
         final int messages = 10;
 
@@ -326,7 +308,7 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
     public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() 
throws Exception {
 
         final String subName = "my-subscriber-name";
-        final String topicName = 
"testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final int messages = 100;
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -376,8 +358,7 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000, dataProvider = "batchedMessageAck")
     public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType 
ackType) throws Exception {
-        String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
-                + numAcked + "-" + batchSize + "-" + ackType.getValue();
+        String topic = newTopicName();
         @Cleanup Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
                 .subscriptionName("sub")
@@ -394,30 +375,30 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
             String value = "msg-" + i;
             producer.sendAsync(value).thenAccept(id -> log.info("{} was sent 
to {}", value, id));
         }
-        List<Message<String>> messages = new ArrayList<>();
+        List<Message<String>> msgs = new ArrayList<>();
         for (int i = 0; i < batchSize; i++) {
-            messages.add(consumer.receive());
+            msgs.add(consumer.receive());
         }
         if (ackType == CommandAck.AckType.Individual) {
             for (int i = 0; i < numAcked; i++) {
-                consumer.acknowledge(messages.get(i));
+                consumer.acknowledge(msgs.get(i));
             }
         } else {
-            consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+            consumer.acknowledgeCumulative(msgs.get(numAcked - 1));
         }
 
         consumer.redeliverUnacknowledgedMessages();
 
-        messages.clear();
+        msgs.clear();
         for (int i = 0; i < batchSize; i++) {
             Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
             if (msg == null) {
                 break;
             }
             log.info("Received {} from {}", msg.getValue(), 
msg.getMessageId());
-            messages.add(msg);
+            msgs.add(msg);
         }
-        List<String> values = 
messages.stream().map(Message::getValue).collect(Collectors.toList());
+        List<String> values = 
msgs.stream().map(Message::getValue).collect(Collectors.toList());
         // All messages are redelivered because only if the whole batch are 
acknowledged would the message ID be
         // added into the ACK tracker.
         if (numAcked < batchSize) {
@@ -429,7 +410,7 @@ public class ConsumerRedeliveryTest extends 
ProducerConsumerBase {
 
     @Test
     public void testRedeliverMessagesWithoutValue() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
+        String topic = newTopicName();
         @Cleanup Consumer<Integer> consumer = 
pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("sub")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 832e814e4ee..31c92583a90 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -51,7 +51,7 @@ import java.util.regex.Pattern;
 import lombok.Cleanup;
 import lombok.Data;
 import org.apache.avro.reflect.Nullable;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -66,30 +66,14 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class DeadLetterTopicTest extends ProducerConsumerBase {
+public class DeadLetterTopicTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(DeadLetterTopicTest.class);
 
-    @BeforeMethod(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        this.conf.setMaxMessageSize(5 * 1024);
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     private String createMessagePayload(int size) {
         StringBuilder str = new StringBuilder();
         Random rand = new Random();
@@ -101,7 +85,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicWithMessageKey() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -118,9 +102,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -161,7 +145,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -178,9 +162,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -222,7 +206,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -239,9 +223,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -283,7 +267,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicMessagesWithEventTime() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -300,9 +284,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -344,11 +328,13 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
     }
 
     public void testDeadLetterTopicWithProducerName() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
         final String subscription = "my-subscription";
         final String consumerName = "my-consumer";
+        // Extract the short topic name from the full topic for matching
+        String shortTopicName = topic.substring(topic.lastIndexOf('/') + 1);
         Pattern deadLetterProducerNamePattern =
-                
Pattern.compile("^persistent://my-property/my-ns/dead-letter-topic"
+                Pattern.compile("^persistent://" + 
Pattern.quote(getNamespace()) + "/" + Pattern.quote(shortTopicName)
                         + "-my-subscription"
                         + "-my-consumer"
                         + "-[a-zA-Z0-9]{5}"
@@ -370,9 +356,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -413,7 +399,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testMultipleSameNameConsumersToDeadLetterTopic() throws 
Exception {
-        final String topic = 
"persistent://my-property/my-ns/same-name-consumers-dead-letter-topic";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 1;
         final int messageCount = 10;
         final int consumerCount = 3;
@@ -492,7 +478,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "produceLargeMessages")
     public void testDeadLetterTopic(boolean produceLargeMessages) throws 
Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 2;
 
@@ -509,9 +495,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -579,7 +565,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testDeadLetterTopicHasOriginalInfo() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
         final int sendMessages = 10;
@@ -595,9 +581,9 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -655,13 +641,13 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testAutoConsumeSchemaDeadLetter() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
         final String subName = "my-subscription";
         final int maxRedeliveryCount = 1;
         final int sendMessages = 10;
 
         admin.topics().createNonPartitionedTopic(topic);
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<FooV2> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.AVRO(FooV2.class))
                 .topic(topic + "-" + subName + "-DLQ")
                 .subscriptionName("my-subscription")
@@ -725,7 +711,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 1;
         final int messageCount = 10;
         final int consumerCount = 3;
@@ -801,8 +787,8 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
      */
     @Test(enabled = false)
     public void testDeadLetterTopicWithMultiTopic() throws Exception {
-        final String topic1 = 
"persistent://my-property/my-ns/dead-letter-topic-1";
-        final String topic2 = 
"persistent://my-property/my-ns/dead-letter-topic-2";
+        final String topic1 = newTopicName();
+        final String topic2 = newTopicName();
 
         final int maxRedeliveryCount = 2;
 
@@ -821,8 +807,8 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
         // subscribe to the DLQ topics before consuming original topics
         Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ",
-                        
"persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
+                .topic(topic1 + "-my-subscription-DLQ",
+                        topic2 + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -883,9 +869,11 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test(groups = "quarantine")
     public void testDeadLetterTopicByCustomTopicName() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 2;
         final int sendMessages = 100;
+        final String customDlqTopic = "persistent://" + getNamespace()
+                + "/dead-letter-custom-topic-my-subscription-custom-DLQ";
 
         // subscribe before publish
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
@@ -896,15 +884,14 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .receiverQueueSize(100)
                 .deadLetterPolicy(DeadLetterPolicy.builder()
                         .maxRedeliverCount(maxRedeliveryCount)
-                        .deadLetterTopic("persistent://my-property/my-ns/"
-                                + 
"dead-letter-custom-topic-my-subscription-custom-DLQ")
+                        .deadLetterTopic(customDlqTopic)
                         .build())
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                .topic(customDlqTopic)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
@@ -933,7 +920,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         deadLetterConsumer.close();
         consumer.close();
         @Cleanup
-        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient1 = newPulsarClient();
         Consumer<byte[]> checkConsumer = 
newPulsarClient1.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -954,7 +941,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
      */
     @Test(timeOut = 200000)
     public void testDeadLetterWithoutConsumerReceiveImmediately() throws 
PulsarClientException, InterruptedException {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
+        final String topic = newTopicName();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
@@ -979,7 +966,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType() 
throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 2;
 
@@ -1001,15 +988,13 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         Consumer<byte[]> deadLetterConsumer0 = 
pulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/"
-                        + 
"dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ")
+                .topic(topic + "-partition-0-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
         Consumer<byte[]> deadLetterConsumer1 = 
pulsarClient.newConsumer(Schema.BYTES)
-                .topic("persistent://my-property/my-ns/"
-                        + 
"dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ")
+                .topic(topic + "-partition-1-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -1080,7 +1065,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicWithInitialSubscription() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -1103,7 +1088,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                 .topic(topic)
@@ -1125,10 +1110,10 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
             totalReceived++;
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
 
-        String deadLetterTopic = 
"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
+        String deadLetterTopic = topic + "-my-subscription-DLQ";
         Awaitility.await().atMost(Duration.ofSeconds(10))
                 .pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
-            
assertTrue(admin.namespaces().getTopics("my-property/my-ns").contains(deadLetterTopic));
+            
assertTrue(admin.namespaces().getTopics(getNamespace()).contains(deadLetterTopic));
             
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
         });
 
@@ -1153,7 +1138,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test()
     public void testDeadLetterTopicWithProducerBuilder() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-with-producer-builder";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 2;
         final int sendMessages = 100;
 
@@ -1180,7 +1165,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic + "-" + subscriptionName + "-DLQ")
                 .subscriptionName(subscriptionNameDLQ)
@@ -1283,7 +1268,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers() 
throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 1;
 
@@ -1319,7 +1304,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                 .topic(topic)
@@ -1336,10 +1321,10 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                         consumerReceiveForDLQ(otherConsumer, totalReceived, 
sendMessages, maxRedeliveryCount))
                 .get(10, TimeUnit.SECONDS);
 
-        String deadLetterTopic = 
"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
+        String deadLetterTopic = topic + "-my-subscription-DLQ";
         Awaitility.await().atMost(Duration.ofSeconds(10))
                 .pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
-            
assertTrue(admin.namespaces().getTopics("my-property/my-ns").contains(deadLetterTopic));
+            
assertTrue(admin.namespaces().getTopics(getNamespace()).contains(deadLetterTopic));
             
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
         });
 
@@ -1396,7 +1381,6 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         long number;
 
         public PayloadIncompatible() {
-
         }
 
         public PayloadIncompatible(long number) {
@@ -1407,8 +1391,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
     // reproduce issue reported in 
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
     @Test
     public void 
testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws 
Exception {
-        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
-        admin.namespaces().createNamespace(namespace);
+        String namespace = getNamespace();
         // don't enforce schema validation
         admin.namespaces().setSchemaValidationEnforced(namespace, false);
         // set schema compatibility strategy to always compatible
@@ -1416,8 +1399,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
         Schema<Payload> schema = Schema.AVRO(Payload.class);
         Schema<PayloadIncompatible> schemaIncompatible = 
Schema.AVRO(PayloadIncompatible.class);
-        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
-                        + 
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+        String topic = newTopicName();
         String dlqTopic = topic + "-DLQ";
 
         // create topics
@@ -1447,7 +1429,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
             Thread.sleep(2000L);
 
-            
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+            assertThat(getTopicReference(dlqTopic).get().getProducers().size())
                     .describedAs("producer count of dlq topic %s should be <= 
1 so that it doesn't leak producers",
                             dlqTopic)
                     .isLessThanOrEqualTo(1);
@@ -1462,7 +1444,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
             }
         }
 
-        
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+        assertThat(getTopicReference(dlqTopic).get().getProducers().size())
                 .describedAs("producer count of dlq topic %s should be 0 here",
                         dlqTopic)
                 .isEqualTo(0);
@@ -1470,8 +1452,8 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
     @Test
     public void testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws 
Exception {
-        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking";
-        final String dlq = 
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking-my-subscription-DLQ";
+        final String topic = newTopicName();
+        final String dlq = topic + "-my-subscription-DLQ";
         final int maxRedeliveryCount = 3;
         final int maxUnackedMessages = 100;
         final int sendMessages = 1000;
@@ -1565,14 +1547,12 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
     // reproduce issue reported in 
https://github.com/apache/pulsar/issues/24541
     @Test
     public void sendDeadLetterTopicWithMismatchSchemaProducer() throws 
Exception {
-        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
-        admin.namespaces().createNamespace(namespace);
+        String namespace = getNamespace();
         // don't enforce schema validation
         admin.namespaces().setSchemaValidationEnforced(namespace, false);
         // set schema compatibility strategy to always compatible
         admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
-        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace
-                + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+        final String topic = newTopicName();
         final String retryTopic = topic + "-RETRY";
         final String deadLetterTopic = topic + "-DLQ";
         final Long deadLetterMessageValue = 1234567890L;
@@ -1606,7 +1586,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
         Thread.sleep(3000L);
 
-        assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+        assertThat(getTopicReference(topic).get()
                 
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
                 .describedAs("redeliver count of topic %s should be less than 
or equal to 2 because of mismatch schema",
                         topic)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
index 48089aae7c2..238ab5b7577 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -25,37 +25,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
+public class PatternMultiTopicsConsumerTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(PatternMultiTopicsConsumerTest.class);
 
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        isTcpLookup = true;
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test(timeOut = 5000)
     public void testSimple() throws Exception {
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+        String ns = getNamespace();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern("persistent://" + ns + "/topic.*")
                 // Make sure topics are discovered before test times out
                 .patternAutoDiscoveryPeriod(2, TimeUnit.SECONDS)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -65,7 +51,9 @@ public class PatternMultiTopicsConsumerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 5000)
     public void testNotifications() throws Exception {
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+        String ns = getNamespace();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topicsPattern("persistent://" + ns + "/topic.*")
                 // Set auto-discovery period high so that only notifications 
can inform us about new topics
                 .patternAutoDiscoveryPeriod(1, TimeUnit.MINUTES)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -74,9 +62,10 @@ public class PatternMultiTopicsConsumerTest extends 
ProducerConsumerBase {
     }
 
     private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
+        String ns = getNamespace();
         Map<String, List<String>> sentMessages = new HashMap<>();
         for (int p = 0; p < 10; ++p) {
-            String name = "persistent://my-property/my-ns/topic-" + p;
+            String name = "persistent://" + ns + "/topic-" + p;
             Producer<byte[]> producer = 
pulsarClient.newProducer().topic(name).create();
             for (int i = 0; i < 10; i++) {
                 String message = "message-" + p + i;
@@ -100,9 +89,10 @@ public class PatternMultiTopicsConsumerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testFailedSubscribe() throws Exception {
-        final String topicName1 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
-        final String topicName2 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
-        final String topicName3 = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
+        String ns = getNamespace();
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" 
+ ns + "/tp_test");
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" 
+ ns + "/tp_test");
+        final String topicName3 = BrokerTestUtil.newUniqueName("persistent://" 
+ ns + "/tp_test");
         final String subName = "s1";
         admin.topics().createPartitionedTopic(topicName1, 2);
         admin.topics().createPartitionedTopic(topicName2, 3);
@@ -116,7 +106,7 @@ public class PatternMultiTopicsConsumerTest extends 
ProducerConsumerBase {
         try {
             PatternMultiTopicsConsumerImpl<String> consumer =
                 (PatternMultiTopicsConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
-                    .topicsPattern("persistent://public/default/tp_test.*")
+                    .topicsPattern("persistent://" + ns + "/tp_test.*")
                     .subscriptionType(SubscriptionType.Failover)
                     .subscriptionName(subName)
                     .subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
index 1726101929b..fbd595f9bba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
@@ -23,50 +23,27 @@ import static 
org.apache.pulsar.client.api.SubscriptionType.Failover;
 import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
 import static org.apache.pulsar.client.api.SubscriptionType.Shared;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
-public class UnloadSubscriptionTest extends ProducerConsumerBase {
-
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        conf.setSystemTopicEnabled(false);
-        conf.setTransactionCoordinatorEnabled(false);
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class UnloadSubscriptionTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "unloadCases")
     public Object[][] unloadCases (){
@@ -94,7 +71,7 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "unloadCases")
     public void testSingleConsumer(int msgCount, boolean enabledBatch, int 
maxMsgPerBatch, SubscriptionType subType,
                                    int ackMsgCount) throws Exception {
-        final String topicName = "persistent://my-property/my-ns/tp-" + 
UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subName = "sub";
         Consumer<String> consumer = createConsumer(topicName, subName, 
subType);
         ProducerAndMessageIds producerAndMessageIds =
@@ -103,8 +80,7 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
                 toString(producerAndMessageIds.messageIds));
 
         // Receive all messages and ack some.
-        MessagesEntry messagesEntry = receiveAllMessages(consumer);
-        assertEquals(messagesEntry.messageSet.size(), msgCount);
+        MessagesEntry messagesEntry = receiveMessages(consumer, msgCount);
         if (ackMsgCount > 0){
             LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
             Iterator<MessageId> messageIdIterator = 
messagesEntry.messageIdSet.iterator();
@@ -115,20 +91,18 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
             log.info("ack message-ids: {}", 
toString(ackedMessageIds.stream().toList()));
         }
 
-
         // Unload subscriber.
         PersistentTopic persistentTopic = getPersistentTopic(topicName);
         persistentTopic.unloadSubscription(subName);
         // Receive all messages for the second time.
-        MessagesEntry messagesEntryForTheSecondTime = 
receiveAllMessages(consumer);
+        int expectedAfterUnload = msgCount - ackMsgCount;
+        MessagesEntry messagesEntryForTheSecondTime = 
receiveMessages(consumer, expectedAfterUnload);
         log.info("received message-ids for the second time: {}",
                 
toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
-        assertEquals(messagesEntryForTheSecondTime.messageSet.size(), msgCount 
- ackMsgCount);
 
         // cleanup.
         producerAndMessageIds.producer.close();
         consumer.close();
-        admin.topics().delete(topicName);
     }
 
     @Test(dataProvider = "unloadCases")
@@ -137,7 +111,7 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
         if (subType == Exclusive){
             return;
         }
-        final String topicName = "persistent://my-property/my-ns/tp-" + 
UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subName = "sub";
         Consumer<String> consumer1 = createConsumer(topicName, subName, 
subType);
         Consumer<String> consumer2 = createConsumer(topicName, subName, 
subType);
@@ -147,18 +121,11 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
                 toString(producerAndMessageIds.messageIds));
 
         // Receive all messages and ack some.
-        MessagesEntry messagesEntry1 = receiveAllMessages(consumer1);
-        MessagesEntry messagesEntry2 = receiveAllMessages(consumer2);
-        LinkedHashSet<String> allMessages = new LinkedHashSet<>();
-        allMessages.addAll(messagesEntry1.messageSet);
-        allMessages.addAll(messagesEntry2.messageSet);
-        assertEquals(allMessages.size(), msgCount);
+        List<Consumer<String>> consumers = List.of(consumer1, consumer2);
+        MessagesEntry messagesEntry = receiveMessages(consumers, msgCount);
         if (ackMsgCount > 0){
-            LinkedHashSet<MessageId> allMessageIds = new LinkedHashSet<>();
             LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
-            allMessageIds.addAll(messagesEntry1.messageIdSet);
-            allMessageIds.addAll(messagesEntry2.messageIdSet);
-            Iterator<MessageId> messageIdIterator = allMessageIds.iterator();
+            Iterator<MessageId> messageIdIterator = 
messagesEntry.messageIdSet.iterator();
             for (int i = ackMsgCount; i > 0; i--){
                 ackedMessageIds.add(messageIdIterator.next());
             }
@@ -171,23 +138,16 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
         persistentTopic.unloadSubscription(subName);
 
         // Receive all messages for the second time.
-        MessagesEntry messagesEntryForTheSecondTime1 = 
receiveAllMessages(consumer1);
-        MessagesEntry messagesEntryForTheSecondTime2 = 
receiveAllMessages(consumer2);
-        LinkedHashSet<String> allMessagesForTheSecondTime = new 
LinkedHashSet<>();
-        
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime1.messageSet);
-        
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime2.messageSet);
-        LinkedHashSet<MessageId> allMessageIdsForTheSecondTime = new 
LinkedHashSet<>();
-        allMessageIdsForTheSecondTime.addAll(messagesEntry1.messageIdSet);
-        allMessageIdsForTheSecondTime.addAll(messagesEntry2.messageIdSet);
+        int expectedAfterUnload = msgCount - ackMsgCount;
+        MessagesEntry messagesEntryForTheSecondTime = 
receiveMessages(consumers, expectedAfterUnload);
         log.info("received message-ids for the second time: {}",
-                toString(allMessageIdsForTheSecondTime.stream().toList()));
-        assertEquals(allMessagesForTheSecondTime.size(), msgCount - 
ackMsgCount);
+                
toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
+        assertEquals(messagesEntryForTheSecondTime.messageSet.size(), 
expectedAfterUnload);
 
         // cleanup.
         producerAndMessageIds.producer.close();
         consumer1.close();
         consumer2.close();
-        admin.topics().delete(topicName);
     }
 
     private static String toString(List<MessageId> messageIds){
@@ -214,7 +174,7 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
     }
 
     private PersistentTopic getPersistentTopic(String topicName) {
-        return (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, 
false).join().get();
+        return (PersistentTopic) getTopic(topicName, false).join().get();
     }
 
     private ProducerAndMessageIds createProducerAndSendMessages(String 
topicName, int msgCount, boolean enabledBatch,
@@ -246,20 +206,64 @@ public class UnloadSubscriptionTest extends 
ProducerConsumerBase {
         return consumer;
     }
 
-    private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws 
Exception {
-        final Set<String> messageSet = Collections.synchronizedSet(new 
LinkedHashSet<>());
-        final Set<MessageId> messageIdSet = Collections.synchronizedSet(new 
LinkedHashSet<>());
-        while (true) {
-            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
-            if (msg == null){
-                break;
-            }
+    private MessagesEntry receiveMessages(Consumer<String> consumer, int 
expectedCount) throws Exception {
+        final LinkedHashSet<String> messageSet = new LinkedHashSet<>();
+        final LinkedHashSet<MessageId> messageIdSet = new LinkedHashSet<>();
+        for (int i = 0; i < expectedCount; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg, "Expected " + expectedCount + " messages but 
only received " + i);
             messageIdSet.add(msg.getMessageId());
             messageSet.add(msg.getValue());
         }
         return new MessagesEntry(messageSet, messageIdSet);
     }
 
-    private record MessagesEntry(Set<String> messageSet, Set<MessageId> 
messageIdSet) {}
+    private MessagesEntry receiveMessages(List<Consumer<String>> consumers, 
int expectedCount) throws Exception {
+        final LinkedHashSet<String> messageSet = new LinkedHashSet<>();
+        final LinkedHashSet<MessageId> messageIdSet = new LinkedHashSet<>();
+        // Use receiveAsync on all consumers concurrently and collect until we 
have enough
+        List<CompletableFuture<Message<String>>> pending = new ArrayList<>();
+        for (Consumer<String> c : consumers) {
+            pending.add(c.receiveAsync());
+        }
+        long deadline = System.currentTimeMillis() + 30_000;
+        while (messageSet.size() < expectedCount && System.currentTimeMillis() 
< deadline) {
+            CompletableFuture<Object> any = 
CompletableFuture.anyOf(pending.toArray(new CompletableFuture[0]));
+            long remaining = deadline - System.currentTimeMillis();
+            if (remaining <= 0) {
+                break;
+            }
+            try {
+                any.get(remaining, TimeUnit.MILLISECONDS);
+            } catch (java.util.concurrent.TimeoutException e) {
+                break;
+            }
+            // Collect all completed futures
+            List<CompletableFuture<Message<String>>> newPending = new 
ArrayList<>();
+            for (int i = 0; i < pending.size(); i++) {
+                CompletableFuture<Message<String>> f = pending.get(i);
+                if (f.isDone()) {
+                    Message<String> msg = f.join();
+                    messageIdSet.add(msg.getMessageId());
+                    messageSet.add(msg.getValue());
+                    if (messageSet.size() < expectedCount) {
+                        newPending.add(consumers.get(i).receiveAsync());
+                    }
+                } else {
+                    newPending.add(f);
+                }
+            }
+            pending = newPending;
+        }
+        assertEquals(messageSet.size(), expectedCount,
+                "Expected " + expectedCount + " messages but received " + 
messageSet.size());
+        // Cancel any remaining pending futures
+        for (CompletableFuture<Message<String>> f : pending) {
+            f.cancel(false);
+        }
+        return new MessagesEntry(messageSet, messageIdSet);
+    }
+
+    private record MessagesEntry(LinkedHashSet<String> messageSet, 
LinkedHashSet<MessageId> messageIdSet) {}
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index ea399af2bc2..5016c0eee77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -38,31 +38,17 @@ import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.tests.EnumValuesDataProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test
-public class MessageIdTest extends BrokerTestBase {
+public class MessageIdTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(MessageIdTest.class);
 
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception {
-        baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
-
     @Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class, 
dataProvider = "values")
     public void producerSendAsync(TopicType topicType) throws 
PulsarClientException, PulsarAdminException {
         // Given
         String key = "producerSendAsync-" + topicType;
-        final String topicName = "persistent://my-property/my-ns/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-subscription-" + key;
         final String messagePrefix = "my-message-" + key + "-";
         final int numberOfMessages = 30;
@@ -129,7 +115,7 @@ public class MessageIdTest extends BrokerTestBase {
     public void producerSend(TopicType topicType) throws 
PulsarClientException, PulsarAdminException {
         // Given
         String key = "producerSend-" + topicType;
-        final String topicName = "persistent://my-property/my-ns/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-subscription-" + key;
         final String messagePrefix = "my-message-" + key + "-";
         final int numberOfMessages = 30;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index eddcd393a8b..5b38af8637d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -29,32 +29,18 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase {
+public class PerMessageUnAcknowledgedRedeliveryTest extends 
SharedPulsarBaseTest {
     private static final long testTimeout = 90000; // 1.5 min
     private static final Logger log = 
LoggerFactory.getLogger(PerMessageUnAcknowledgedRedeliveryTest.class);
     private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
 
-    @Override
-    @BeforeMethod
-    public void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test(timeOut = testTimeout)
     public void testSharedAckedNormalTopic() throws Exception {
         String key = "testSharedAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 15;
@@ -152,7 +138,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
     @Test(timeOut = testTimeout)
     public void testUnAckedMessageTrackerSize() throws Exception {
         String key = "testUnAckedMessageTrackerSize";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 15;
@@ -194,7 +180,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
     @Test(timeOut = testTimeout)
     public void testExclusiveAckedNormalTopic() throws Exception {
         String key = "testExclusiveAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 15;
@@ -292,7 +278,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
     @Test(timeOut = testTimeout)
     public void testFailoverAckedNormalTopic() throws Exception {
         String key = "testFailoverAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 15;
@@ -396,7 +382,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
     @Test(timeOut = testTimeout)
     public void testSharedAckedPartitionedTopic() throws Exception {
         String key = "testSharedAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
+        final String topicName = "persistent://" + getNamespace() + "/topic-" 
+ key;
         final String subscriptionName = "my-ex-subscription-" + key;
         final String messagePredicate = "my-message-" + key + "-";
         final int totalMessages = 15;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index e3bd36a8914..87f2a9b131d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -25,10 +25,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -36,26 +35,11 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class ProducerCloseTest extends ProducerConsumerBase {
-
-    @Override
-    @BeforeMethod
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ProducerCloseTest extends SharedPulsarBaseTest {
 
     /**
      * Param1: Producer enableBatch or not
@@ -85,10 +69,11 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
 
     @Test(dataProvider = "brokenPipeline")
     public void testProducerCloseCallback2(boolean brokenPipeline) throws 
Exception {
-        initClient();
         @Cleanup
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-                .topic("testProducerClose")
+        PulsarClient client = newPulsarClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+                .topic(newTopicName())
                 .sendTimeout(5, TimeUnit.SECONDS)
                 .maxPendingMessages(0)
                 .enableBatching(false)
@@ -100,23 +85,22 @@ public class ProducerCloseTest extends 
ProducerConsumerBase {
         producer.closeAsync();
         Thread.sleep(3000);
         if (brokenPipeline) {
-            //producer.getClientCnx().channel().config().setAutoRead(true);
             producer.getClientCnx().channel().close();
         } else {
             producer.getClientCnx().channel().config().setAutoRead(true);
         }
         Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
-            System.out.println(1);
             Assert.assertTrue(completableFuture.isDone());
         });
     }
 
     @Test(timeOut = 10_000)
     public void testProducerCloseCallback() throws Exception {
-        initClient();
         @Cleanup
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-                .topic("testProducerClose")
+        PulsarClient client = newPulsarClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+                .topic(newTopicName())
                 .sendTimeout(5, TimeUnit.SECONDS)
                 .maxPendingMessages(0)
                 .enableBatching(false)
@@ -127,7 +111,7 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
         final CompletableFuture<MessageId> completableFuture = 
value.sendAsync();
         producer.closeAsync();
         final CommandSuccess commandSuccess = new CommandSuccess();
-        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        PulsarClientImpl clientImpl = (PulsarClientImpl) client;
         commandSuccess.setRequestId(clientImpl.newRequestId() - 1);
         producer.getClientCnx().handleSuccess(commandSuccess);
         Thread.sleep(3000);
@@ -136,10 +120,11 @@ public class ProducerCloseTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 10_000)
     public void 
testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws 
Exception {
-        initClient();
         @Cleanup
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-                .topic("testProducerClose")
+        PulsarClient client = newPulsarClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
client.newProducer()
+                .topic(newTopicName())
                 .maxPendingMessages(10)
                 .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
                 .batchingMaxBytes(Integer.MAX_VALUE)
@@ -165,9 +150,9 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
         PulsarClient longBackOffClient = PulsarClient.builder()
                 .startingBackoffInterval(5, TimeUnit.SECONDS)
                 .maxBackoffInterval(5, TimeUnit.SECONDS)
-                .serviceUrl(lookupUrl.toString())
+                .serviceUrl(getBrokerServiceUrl())
                 .build();
-        String topic = "broker-close-test-" + 
RandomStringUtils.randomAlphabetic(5);
+        String topic = newTopicName();
         @Cleanup
         ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
longBackOffClient.newProducer()
                 .topic(topic)
@@ -175,8 +160,7 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
                 .create();
         producer.newMessage().value("test".getBytes()).send();
 
-        Optional<Topic> topicOptional = pulsar.getBrokerService()
-                
.getTopicReference(TopicName.get(topic).getPartitionedTopicName());
+        Optional<Topic> topicOptional = 
getTopicReference(TopicName.get(topic).getPartitionedTopicName());
         Assert.assertTrue(topicOptional.isPresent());
         topicOptional.get().close(true).get();
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(producer.getState(), HandlerState.State.Connecting));
@@ -186,10 +170,4 @@ public class ProducerCloseTest extends 
ProducerConsumerBase {
             producer.newMessage().value("test".getBytes()).send();
         }
     }
-
-    private void initClient() throws PulsarClientException {
-        replacePulsarClient(PulsarClient.builder().
-                serviceUrl(lookupUrl.toString()));
-    }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index f528c827549..c71331026fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -19,46 +19,35 @@
 package org.apache.pulsar.client.impl;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
  * Tests for not exists topic.
  */
 @Test(groups = "broker-impl")
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class TopicDoesNotExistsTest extends SharedPulsarBaseTest {
 
-    @Override
-    @BeforeClass
-    public void setup() throws Exception {
-        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
-        isTcpLookup = true;
-        conf.setAllowAutoTopicCreation(false);
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
+    @BeforeMethod(alwaysRun = true)
+    public void disableAutoTopicCreation() throws Exception {
+        admin.namespaces().setAutoTopicCreation(getNamespace(),
+                
AutoTopicCreationOverride.builder().allowAutoTopicCreation(false).build());
     }
 
     @Test
     public void testCreateProducerOnNotExistsTopic() throws 
PulsarClientException, InterruptedException {
         @Cleanup
-        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(getBrokerServiceUrl()).build();
         try {
-            pulsarClient.newProducer()
-                    .topic("persistent://public/default/" + 
UUID.randomUUID().toString())
+            client.newProducer()
+                    .topic(newTopicName())
                     .sendTimeout(100, TimeUnit.MILLISECONDS)
                     .create();
             Assert.fail("Create producer should failed while topic does not 
exists.");
@@ -66,26 +55,29 @@ public class TopicDoesNotExistsTest extends 
ProducerConsumerBase {
             Assert.assertTrue(e instanceof 
PulsarClientException.TopicDoesNotExistException);
         }
         Thread.sleep(2000);
-        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
client).timer();
         Assert.assertEquals(timer.pendingTimeouts(), 0);
-        Assert.assertEquals(((PulsarClientImpl) 
pulsarClient).producersCount(), 0);
+        Assert.assertEquals(((PulsarClientImpl) client).producersCount(), 0);
     }
 
     @Test
     public void testCreateConsumerOnNotExistsTopic() throws 
PulsarClientException, InterruptedException {
         @Cleanup
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .operationTimeout(1, TimeUnit.SECONDS)
+                .build();
         try {
-            pulsarClient.newConsumer()
-                    .topic("persistent://public/default/" + 
UUID.randomUUID().toString())
+            client.newConsumer()
+                    .topic(newTopicName())
                     .subscriptionName("test")
                     .subscribe();
             Assert.fail("Create consumer should failed while topic does not 
exists.");
         } catch (PulsarClientException ignore) {
         }
         Thread.sleep(2000);
-        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
client).timer();
         Assert.assertEquals(timer.pendingTimeouts(), 0);
-        Assert.assertEquals(((PulsarClientImpl) 
pulsarClient).consumersCount(), 0);
+        Assert.assertEquals(((PulsarClientImpl) client).consumersCount(), 0);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
index 3d0f696716e..092626db5c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.processor;
 
-import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,21 +26,17 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -50,24 +45,7 @@ import org.testng.annotations.Test;
  */
 @Slf4j
 @Test(groups = "broker-impl")
-public class MessagePayloadProcessorTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        admin.clusters().createCluster("test",
-                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid"), 
Sets.newHashSet("test")));
-        admin.namespaces().createNamespace("public/default", 
Sets.newHashSet("test"));
-    }
-
-    @AfterClass
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class MessagePayloadProcessorTest extends SharedPulsarBaseTest {
 
     @DataProvider
     public static Object[][] config() {
@@ -92,7 +70,7 @@ public class MessagePayloadProcessorTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "config")
     public void testDefaultProcessor(int numPartitions, boolean 
enableBatching, int batchingMaxMessages)
             throws Exception {
-        final String topic = "testDefaultProcessor-" + numPartitions + "-" + 
enableBatching + "-" + batchingMaxMessages;
+        final String topic = newTopicName();
         final int numMessages = 10;
         final String messagePrefix = "msg-";
 
@@ -188,8 +166,10 @@ public class MessagePayloadProcessorTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "customBatchConfig")
     public void testCustomProcessor(final int numMessages, final int 
batchingMaxMessages) throws Exception {
-        final String topic = "persistent://public/default/testCustomProcessor-"
-                + numMessages + "-" + batchingMaxMessages;
+        // Disable dedup: CustomBatchProducer writes directly to managed 
ledger without
+        // producer name/sequence ID, causing NPE in MessageDeduplication
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
+        final String topic = newTopicName();
 
         @Cleanup
         final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
@@ -200,7 +180,7 @@ public class MessagePayloadProcessorTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         final PersistentTopic persistentTopic =
-                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topic).get().orElse(null);
+                (PersistentTopic) getTopicIfExists(topic).get().orElse(null);
         Assert.assertNotNull(persistentTopic);
 
         final String messagePrefix = "msg-";

Reply via email to