This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 765c46e2747 [cleanup] Convert 10 test classes to SharedPulsarBaseTest 
(#25338)
765c46e2747 is described below

commit 765c46e274782cea3001430fce56f5862030abf2
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Mar 21 14:14:08 2026 -0700

    [cleanup] Convert 10 test classes to SharedPulsarBaseTest (#25338)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../broker/service/ConsumedLedgersTrimTest.java    |   7 +-
 .../service/CurrentLedgerRolloverIfFullTest.java   |   2 +
 .../pulsar/broker/service/SharedPulsarCluster.java |   1 +
 .../pulsar/client/api/ClientDeduplicationTest.java |  69 +++----
 .../apache/pulsar/client/api/InterceptorsTest.java | 101 +++++-----
 .../api/PartitionedProducerConsumerTest.java       | 216 ++++++++++-----------
 .../client/api/PersistentTopicTerminateTest.java   |   6 +-
 .../apache/pulsar/client/api/RetryTopicTest.java   | 131 ++++++-------
 .../apache/pulsar/client/api/TopicReaderTest.java  | 134 +++++++------
 .../impl/ConsumerDecryptFailListenerTest.java      |  40 +---
 .../client/impl/MessageChunkingSharedTest.java     |  30 +--
 .../impl/PartialPartitionedProducerTest.java       |  31 +--
 .../pulsar/client/impl/ProducerMemoryLeakTest.java |  39 +---
 .../client/impl/PulsarMultiHostClientTest.java     |  49 +++--
 14 files changed, 376 insertions(+), 480 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 7d673decd2a..d789948fcd5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -50,6 +50,8 @@ public class ConsumedLedgersTrimTest extends 
SharedPulsarBaseTest {
 
     @Test
     public void testConsumedLedgersTrim() throws Exception {
+        // Disable dedup so the pulsar.dedup cursor doesn't block ledger 
trimming
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
         // Set infinite retention at namespace level so ledgers are preserved 
until explicitly trimmed
         admin.namespaces().setRetention(getNamespace(), new 
RetentionPolicies(-1, -1));
 
@@ -106,6 +108,8 @@ public class ConsumedLedgersTrimTest extends 
SharedPulsarBaseTest {
 
     @Test
     public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
+        // Disable dedup so the pulsar.dedup cursor doesn't block ledger 
trimming
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
         // Set infinite retention at namespace level so ledgers are preserved 
until explicitly trimmed
         admin.namespaces().setRetention(getNamespace(), new 
RetentionPolicies(-1, -1));
 
@@ -178,6 +182,8 @@ public class ConsumedLedgersTrimTest extends 
SharedPulsarBaseTest {
 
     @Test
     public void testAdminTrimLedgers() throws Exception {
+        // Disable dedup so the pulsar.dedup cursor doesn't block ledger 
trimming
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
         final String subscriptionName = "my-sub";
         final int maxEntriesPerLedger = 2;
         final int partitionedNum = 3;
@@ -230,7 +236,6 @@ public class ConsumedLedgersTrimTest extends 
SharedPulsarBaseTest {
                     "Expected at most 2 ledgers after trim, but found "
                             + managedLedger.getLedgersInfoAsList().size());
         });
-
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 226456df597..be23800e6d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -37,6 +37,8 @@ public class CurrentLedgerRolloverIfFullTest extends 
SharedPulsarBaseTest {
 
     @Test
     public void testCurrentLedgerRolloverIfFull() throws Exception {
+        // Disable dedup so the pulsar.dedup cursor doesn't block ledger 
trimming
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
         final String topicName = newTopicName();
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index d67c8194edf..a561712b8ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -146,6 +146,7 @@ public class SharedPulsarCluster {
         config.setForceDeleteNamespaceAllowed(true);
         config.setForceDeleteTenantAllowed(true);
         config.setBrokerDeleteInactiveTopicsEnabled(false);
+        config.setBrokerDeduplicationEnabled(true);
 
         // Reduce thread pool sizes for faster startup (fewer threads to 
create)
         config.setNumIOThreads(2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 4e96252056d..aaaca5a8bba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -34,18 +34,17 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
-public class ClientDeduplicationTest extends ProducerConsumerBase {
+public class ClientDeduplicationTest extends SharedPulsarBaseTest {
 
     @DataProvider
     public static Object[][] batchingTypes() {
@@ -55,22 +54,9 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         };
     }
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test(priority = -1)
     public void testNamespaceDeduplicationApi() throws Exception {
-        final String namespace = "my-property/my-ns";
+        final String namespace = getNamespace();
         assertNull(admin.namespaces().getDeduplicationStatus(namespace));
         admin.namespaces().setDeduplicationStatus(namespace, true);
         Awaitility.await().untilAsserted(() -> 
assertTrue(admin.namespaces().getDeduplicationStatus(namespace)));
@@ -82,8 +68,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test
     public void testProducerSequenceAfterReconnect() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        final String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
                 .producerName("my-producer-name");
@@ -113,8 +99,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test
     public void testProducerSequenceAfterRestart() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testProducerSequenceAfterRestart";
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
                 .producerName("my-producer-name");
@@ -130,8 +116,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
         producer.close();
 
-        // Kill and restart broker
-        restartBroker();
+        // Unload topic to force reload of dedup state
+        admin.topics().unload(topic);
 
         producer = producerBuilder.create();
         assertEquals(producer.getLastSequenceId(), 9L);
@@ -147,8 +133,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testProducerDeduplication() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testProducerDeduplication";
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         // Set infinite timeout
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
@@ -180,8 +166,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
         assertNull(msg);
 
-        // Kill and restart broker
-        restartBroker();
+        // Unload topic to force reload of dedup state
+        admin.topics().unload(topic);
 
         producer = producerBuilder.create();
         assertEquals(producer.getLastSequenceId(), 2L);
@@ -198,9 +184,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000, dataProvider = "batchingTypes")
     public void 
testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder 
batcherBuilder) throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
-                + System.currentTimeMillis();
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         // Set infinite timeout
         ProducerBuilder<byte[]> producerBuilder =
@@ -244,8 +229,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         assertNull(msg);
 
         producer.close();
-        // Kill and restart broker
-        restartBroker();
+        // Unload topic to force reload of dedup state
+        admin.topics().unload(topic);
 
         producer = producerBuilder.create();
         assertEquals(producer.getLastSequenceId(), 6L);
@@ -263,8 +248,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testProducerDeduplicationNonBatchAsync() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync";
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         // Set infinite timeout
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topic)
@@ -295,8 +280,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
         assertNull(msg);
 
-        // Kill and restart broker
-        restartBroker();
+        // Unload topic to force reload of dedup state
+        admin.topics().unload(topic);
 
         producer = producerBuilder.create();
         assertEquals(producer.getLastSequenceId(), 5L);
@@ -313,8 +298,8 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testKeyBasedBatchingOrder() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/test-key-based-batching-order";
-        admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+        final String topic = newTopicName();
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -379,13 +364,13 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
 
     @Test
     public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
+        final String topic = newTopicName();
         int totalMessage = 200;
         int threadSize = 5;
-        String topicName = "subscription";
+        String subscriptionName = "subscription";
         @Cleanup("shutdownNow")
         ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
-        conf.setBrokerDeduplicationEnabled(true);
+        admin.namespaces().setDeduplicationStatus(getNamespace(), true);
 
         //build producer/consumer
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -397,7 +382,7 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionType(SubscriptionType.Exclusive)
-                .subscriptionName(topicName)
+                .subscriptionName(subscriptionName)
                 .subscribe();
 
         CountDownLatch countDownLatch = new CountDownLatch(threadSize);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 19665de29a1..bb2cdeb0309 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.api;
 
-import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -33,8 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
@@ -45,29 +43,14 @@ import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class InterceptorsTest extends ProducerConsumerBase {
+public class InterceptorsTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(InterceptorsTest.class);
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @DataProvider(name = "receiverQueueSize")
     public Object[][] getReceiverQueueSize() {
         return new Object[][] { { 0 }, { 1000 } };
@@ -81,18 +64,16 @@ public class InterceptorsTest extends ProducerConsumerBase {
         return new Object[][] {{ 0 }, { 3 }};
     }
 
-    @DataProvider(name = "topics")
-    public Object[][] getTopics() {
-        return new Object[][] {{ 
List.of("persistent://my-property/my-ns/my-topic") },
-                { List.of("persistent://my-property/my-ns/my-topic", 
"persistent://my-property/my-ns/my-topic1") }};
+    @DataProvider(name = "topicCount")
+    public Object[][] getTopicCount() {
+        return new Object[][] {{ 1 }, { 2 }};
     }
 
     @Test
     public void testProducerInterceptor() throws Exception {
         Map<MessageId, List<String>> ackCallback = new HashMap<>();
 
-        String ns = "my-property/my-ns" + RandomUtils.nextInt(999, 1999);
-        admin.namespaces().createNamespace(ns, Sets.newHashSet("test"));
+        String ns = getNamespace();
         admin.namespaces().setSchemaCompatibilityStrategy(ns, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
 
         abstract class BaseInterceptor implements
@@ -164,6 +145,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testProducerInterceptorsWithExceptions() throws 
PulsarClientException {
+        final String topicName = newTopicName();
         ProducerInterceptor<String> interceptor = new 
ProducerInterceptor<String>() {
             @Override
             public void close() {
@@ -182,7 +164,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
             }
         };
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-            .topic("persistent://my-property/my-ns/my-topic")
+            .topic(topicName)
             .intercept(interceptor)
             .create();
 
@@ -193,6 +175,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testProducerInterceptorsWithErrors() throws 
PulsarClientException {
+        final String topicName = newTopicName();
         ProducerInterceptor<String> interceptor = new 
ProducerInterceptor<String>() {
             @Override
             public void close() {
@@ -211,7 +194,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
             }
         };
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .intercept(interceptor)
                 .create();
 
@@ -222,6 +205,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testProducerInterceptorAccessMessageData() throws 
PulsarClientException {
+        final String topicName = newTopicName();
         List<String> messageDataInBeforeSend = 
Collections.synchronizedList(new ArrayList<>());
         List<String> messageDataOnSendAcknowledgement = 
Collections.synchronizedList(new ArrayList<>());
         ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() {
@@ -243,7 +227,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
         };
         @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .intercept(interceptor)
                 .create();
 
@@ -264,6 +248,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testConsumerInterceptorWithErrors() throws 
PulsarClientException {
+        final String topicName = newTopicName();
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
             @Override
             public void close() {
@@ -296,7 +281,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
             }
         };
         Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription-ack-timeout")
@@ -304,14 +289,14 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
                 .subscribe();
 
         Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription-negative")
                 .subscribe();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic-exception")
+                .topic(topicName)
                 .create();
 
         producer.newMessage().value("Hello Pulsar!").send();
@@ -337,6 +322,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test(dataProvider = "receiverQueueSize")
     public void testConsumerInterceptorWithSingleTopicSubscribe(Integer 
receiverQueueSize) throws Exception {
+        final String topicName = newTopicName();
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
             @Override
             public void close() {
@@ -372,7 +358,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
         };
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription")
@@ -380,7 +366,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
                 .subscribe();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .enableBatching(false)
                 .create();
 
@@ -413,7 +399,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
         final CompletableFuture<Message<String>> future = new 
CompletableFuture<>();
         consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription")
@@ -446,6 +432,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testConsumerInterceptorWithMultiTopicSubscribe() throws 
PulsarClientException {
+        final String topicName = newTopicName();
+        final String topicName1 = newTopicName();
 
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
             @Override
@@ -482,15 +470,15 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .create();
 
         Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic1")
+                .topic(topicName1)
                 .create();
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic", 
"persistent://my-property/my-ns/my-topic1")
+                .topic(topicName, topicName1)
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription")
@@ -527,7 +515,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
         AtomicInteger beforeConsumeCount = new AtomicInteger(0);
         PulsarClient client = PulsarClient.builder()
-                .serviceUrl(lookupUrl.toString())
+                .serviceUrl(getBrokerServiceUrl())
                 .listenerThreads(1)
                 .build();
 
@@ -560,7 +548,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
             }
         };
 
-        final String topicName = "persistent://my-property/my-ns/my-topic";
+        final String topicName = newTopicName();
 
         if (partitions > 0) {
             admin.topics().createPartitionedTopic(topicName, partitions);
@@ -584,7 +572,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
                 .subscribe();
 
         Producer<String> producer = client.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .create();
 
         final int messages = 10;
@@ -602,6 +590,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testConsumerInterceptorWithPatternTopicSubscribe() throws 
PulsarClientException {
+        final String topicName = newTopicName();
+        final String topicName1 = newTopicName();
 
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
             @Override
@@ -638,15 +628,15 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .create();
 
         Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic1")
+                .topic(topicName1)
                 .create();
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topicsPattern("persistent://my-property/my-ns/my-.*")
+                .topicsPattern("persistent://" + getNamespace() + "/.*")
                 .subscriptionType(SubscriptionType.Shared)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription")
@@ -674,6 +664,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test
     public void testConsumerInterceptorForAcknowledgeCumulative() throws 
PulsarClientException {
+        final String topicName = newTopicName();
 
         List<MessageId> ackHolder = new ArrayList<>();
 
@@ -715,14 +706,14 @@ public class InterceptorsTest extends 
ProducerConsumerBase {
         };
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .subscriptionType(SubscriptionType.Failover)
                 .intercept(interceptor)
                 .subscriptionName("my-subscription")
                 .subscribe();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic(topicName)
                 .create();
 
         for (int i = 0; i < 100; i++) {
@@ -748,9 +739,13 @@ public class InterceptorsTest extends ProducerConsumerBase 
{
         consumer.close();
     }
 
-    @Test(dataProvider = "topics")
-    public void testConsumerInterceptorForNegativeAcksSend(List<String> topics)
+    @Test(dataProvider = "topicCount")
+    public void testConsumerInterceptorForNegativeAcksSend(int topicCount)
             throws PulsarClientException, InterruptedException {
+        List<String> topics = new ArrayList<>();
+        for (int i = 0; i < topicCount; i++) {
+            topics.add(newTopicName());
+        }
         final int totalNumOfMessages = 100;
         CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
 
@@ -820,9 +815,13 @@ public class InterceptorsTest extends ProducerConsumerBase 
{
         consumer.close();
     }
 
-    @Test(dataProvider = "topics")
-    public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) 
throws PulsarClientException,
+    @Test(dataProvider = "topicCount")
+    public void testConsumerInterceptorForAckTimeoutSend(int topicCount) 
throws PulsarClientException,
             InterruptedException {
+        List<String> topics = new ArrayList<>();
+        for (int i = 0; i < topicCount; i++) {
+            topics.add(newTopicName());
+        }
         final int totalNumOfMessages = 100;
         CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
 
@@ -890,7 +889,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
 
     @Test(timeOut = 1000 * 30, dataProvider = "topicPartition")
     public void testReaderInterceptor(int topicPartition) throws Exception {
-        String topic = "reader-interceptor-" + topicPartition + "-" + 
RandomStringUtils.randomAlphabetic(5);
+        String topic = newTopicName();
         if (topicPartition > 0) {
             admin.topics().createPartitionedTopic(topic, topicPartition);
         }
@@ -999,7 +998,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
     @Test(dataProvider = "topicPartition")
     public void testConsumerInterceptorForOnArrive(int topicPartition) throws 
PulsarClientException,
             InterruptedException, PulsarAdminException {
-        String topicName = "persistent://my-property/my-ns/on-arrive";
+        String topicName = newTopicName();
         if (topicPartition > 0) {
             admin.topics().createPartitionedTopic(topicName, topicPartition);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index ba4054ef700..a0e487bf3ad 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.util.Timeout;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,6 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
@@ -47,48 +49,48 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
+public class PartitionedProducerConsumerTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
 
     private ExecutorService executor;
+    protected String methodName;
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
+    @BeforeMethod(alwaysRun = true)
+    public void setTestMethodName(Method m) throws Exception {
+        methodName = m.getName();
+    }
 
+    @BeforeClass(alwaysRun = true)
+    public void setupExecutor() throws Exception {
         executor = Executors.newFixedThreadPool(1, new 
DefaultThreadFactory("PartitionedProducerConsumerTest"));
     }
 
     @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
+    public void cleanupExecutor() throws Exception {
         executor.shutdownNow();
     }
 
     @Test(timeOut = 30000)
     public void testRoundRobinProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         int numPartitions = 4;
-        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-"
-                + System.currentTimeMillis());
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-partitioned-subscriber").subscribe();
-        assertEquals(consumer.getTopic(), topicName.toString());
+        assertEquals(consumer.getTopic(), topicName);
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -110,7 +112,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -119,24 +120,23 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     public void testPartitionedTopicNameWithSpecialCharacter() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
 
+        final String topicName = newTopicName();
         int numPartitions = 4;
         final String specialCharacter = "! * ' ( ) ; : @ & = + $ , \\ ? % # [ 
]";
-        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-"
-                        + System.currentTimeMillis() + specialCharacter);
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        String topicWithSpecialChar = topicName + specialCharacter;
+        admin.topics().createPartitionedTopic(topicWithSpecialChar, 
numPartitions);
 
         // Try to create producer which does lookup and create connection with 
broker
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicWithSpecialChar)
                 
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
         producer.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
         log.info("-- Exiting {} test --", methodName);
     }
 
     @Test(timeOut = 30000)
     public void testCustomPartitionProducer() throws Exception {
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
-        TopicName topicName = null;
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
         Producer<byte[]> producer = null;
         Consumer<byte[]> consumer = null;
         final int messageCount = 16;
@@ -144,16 +144,14 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             log.info("-- Starting {} test --", methodName);
 
             int numPartitions = 4;
-            topicName = TopicName
-                    
.get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
 
-            admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+            admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-            producer = pulsarClient.newProducer().topic(topicName.toString())
+            producer = pulsarClient.newProducer().topic(topicName)
                     .messageRouter(new AlwaysTwoMessageRouter())
                     .create();
 
-            consumer = pulsarClient.newConsumer().topic(topicName.toString())
+            consumer = pulsarClient.newConsumer().topic(topicName)
                     .subscriptionName("my-partitioned-subscriber").subscribe();
 
             for (int i = 0; i < messageCount; i++) {
@@ -178,7 +176,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             consumer.unsubscribe();
             consumer.close();
             pulsarClient.close();
-            admin.topics().deletePartitionedTopic(topicName.toString());
 
             log.info("-- Exiting {} test --", methodName);
         }
@@ -187,18 +184,17 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testSinglePartitionProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic2-" + 
System.currentTimeMillis());
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-partitioned-subscriber").subscribe();
 
         for (int i = 0; i < 10; i++) {
@@ -223,7 +219,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -231,18 +226,17 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testKeyBasedProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic3-" + 
System.currentTimeMillis());
         String dummyKey1 = "dummykey1";
         String dummyKey2 = "dummykey2";
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-partitioned-subscriber").subscribe();
 
         for (int i = 0; i < 5; i++) {
@@ -269,7 +263,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -286,11 +279,10 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 100000)
     public void testPauseAndResume() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         int numPartitions = 2;
-        String topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic-pr-" 
+ System.currentTimeMillis()).toString();
 
         admin.topics().createPartitionedTopic(topicName, numPartitions);
 
@@ -341,14 +333,13 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testPauseAndResumeWithUnloading() throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/pause-and-resume-with-unloading"
-                + System.currentTimeMillis();
+        final String topicName = newTopicName();
         final String subName = "sub";
         final int receiverQueueSize = 20;
         final int numPartitions = 2;
         final int numMessages = receiverQueueSize * numPartitions;
 
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        PulsarClient pulsarClient = newPulsarClient();
         admin.topics().createPartitionedTopic(topicName, numPartitions);
 
         AtomicReference<CountDownLatch> latch = new AtomicReference<>(new 
CountDownLatch(numMessages));
@@ -388,22 +379,20 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         producer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName, true);
     }
 
     @Test(timeOut = 30000)
     public void testInvalidSequence() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        final String topicName = newTopicName();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic4-" + 
System.currentTimeMillis());
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-subscriber-name").subscribe();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
@@ -442,17 +431,15 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             // ok
         }
 
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
     }
 
     @Test(timeOut = 30000)
     public void testSillyUser() throws Exception {
+        final String topicName = newTopicName();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic5-" + 
System.currentTimeMillis());
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
         Producer<byte[]> producer = null;
         Consumer<byte[]> consumer = null;
@@ -472,9 +459,9 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         try {
-            producer = 
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+            producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false)
                     
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
-            consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
+            consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
             producer.send("message1".getBytes());
             producer.send("message2".getBytes());
             /* Message<byte[]> msg1 = */ consumer.receive();
@@ -486,27 +473,25 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             consumer.close();
         }
 
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
     }
 
     @Test(timeOut = 30000)
     public void testDeletePartitionedTopic() throws Exception {
+        final String topicName = newTopicName();
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic6-" + 
System.currentTimeMillis());
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString()).create();
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
                 .subscribe();
         consumer.unsubscribe();
         consumer.close();
         producer.close();
 
-        admin.topics().deletePartitionedTopic(topicName.toString());
+        admin.topics().deletePartitionedTopic(topicName);
 
-        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName.toString()).create();
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName).create();
         if (producer1 instanceof PartitionedProducerImpl) {
             Assert.fail("should fail since partitioned topic was deleted");
         }
@@ -515,21 +500,20 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testAsyncPartitionedProducerConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        final String topicName = newTopicName();
 
         final int totalMsg = 100;
         final Set<String> produceMsgs = new HashSet<>();
         final Set<String> consumeMsgs = new HashSet<>();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
 
         // produce messages
@@ -556,7 +540,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         producer.close();
         consumer.unsubscribe();
         consumer.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -564,23 +547,22 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         final int totalMsg = 100;
         final Set<String> produceMsgs = new HashSet<>();
         final Set<String> consumeMsgs = new HashSet<>();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                .get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString())
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
 
         // produce messages
@@ -608,7 +590,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -621,10 +602,10 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testFairDistributionForPartitionConsumers() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         final int numPartitions = 2;
-        final String topicName = "persistent://my-property/my-ns/my-topic-" + 
System.currentTimeMillis();
         final String producer1Msg = "producer1";
         final String producer2Msg = "producer2";
         final int queueSize = 10;
@@ -672,7 +653,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName);
 
         log.info("-- Exiting {} test --", methodName);
     }
@@ -707,8 +687,8 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testGetPartitionsForTopic() throws Exception {
+        final String topic = newTopicName();
         int numPartitions = 4;
-        String topic = "persistent://my-property/my-ns/my-partitionedtopic1-" 
+ System.currentTimeMillis();
 
         admin.topics().createPartitionedTopic(topic, numPartitions);
 
@@ -719,7 +699,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
         assertEquals(pulsarClient.getPartitionsForTopic(topic).join(), 
expectedPartitions);
 
-        String nonPartitionedTopic = 
"persistent://my-property/my-ns/my-non-partitionedtopic1";
+        String nonPartitionedTopic = newTopicName();
 
         
assertEquals(pulsarClient.getPartitionsForTopic(nonPartitionedTopic).join(),
                 Collections.singletonList(nonPartitionedTopic));
@@ -727,9 +707,9 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testMessageIdForSubscribeToSinglePartition() throws Exception {
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
-        TopicName topicName = null;
-        TopicName partition2TopicName = null;
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
+        String partition2TopicName = 
TopicName.get(topicName).getPartition(2).toString();
         Producer<byte[]> producer = null;
         Consumer<byte[]> consumer1 = null;
         Consumer<byte[]> consumer2 = null;
@@ -739,22 +719,19 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         try {
             log.info("-- Starting {} test --", methodName);
 
-            topicName = 
TopicName.get("persistent://my-property/my-ns/my-topic-" + 
System.currentTimeMillis());
-            partition2TopicName = topicName.getPartition(2);
+            admin.topics().createPartitionedTopic(topicName, numPartitions);
 
-            admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
-
-            producer = pulsarClient.newProducer().topic(topicName.toString())
+            producer = pulsarClient.newProducer().topic(topicName)
                 .messageRouter(new AlwaysTwoMessageRouter())
                 .create();
 
-            consumer1 = pulsarClient.newConsumer().topic(topicName.toString())
+            consumer1 = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("subscriber-partitioned")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscribe();
 
-            consumer2 = 
pulsarClient.newConsumer().topic(partition2TopicName.toString())
+            consumer2 = pulsarClient.newConsumer().topic(partition2TopicName)
                 .subscriptionName("subscriber-single")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionType(SubscriptionType.Exclusive)
@@ -787,7 +764,6 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
             consumer2.unsubscribe();
             consumer2.close();
             pulsarClient.close();
-            admin.topics().deletePartitionedTopic(topicName.toString());
 
             log.info("-- Exiting {} test --", methodName);
         }
@@ -808,10 +784,10 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testAutoUpdatePartitionsForProducerConsumer() throws Exception 
{
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         final int numPartitions = 2;
-        final String topicName = "persistent://my-property/my-ns/my-topic-" + 
System.currentTimeMillis();
         final String producerMsg = "producerMsg";
         final int totalMessages = 30;
 
@@ -897,38 +873,34 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages);
 
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName);
 
         log.info("-- Exiting {} test --", methodName);
     }
 
     @Test
     public void testCustomPartitionedProducer() throws Exception {
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
-        TopicName topicName = null;
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
         Producer<byte[]> producer = null;
         try {
             log.info("-- Starting {} test --", methodName);
 
             int numPartitions = 4;
-            topicName = TopicName
-                    
.get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
 
-            admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+            admin.topics().createPartitionedTopic(topicName, numPartitions);
 
             RouterWithTopicName router = new RouterWithTopicName();
-            producer = pulsarClient.newProducer().topic(topicName.toString())
+            producer = pulsarClient.newProducer().topic(topicName)
                     .messageRouter(router)
                     .create();
             for (int i = 0; i < 1; i++) {
                 String message = "my-message-" + i;
                 
producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
             }
-            assertEquals(router.topicName, topicName.toString());
+            assertEquals(router.topicName, topicName);
         } finally {
             producer.close();
             pulsarClient.close();
-            admin.topics().deletePartitionedTopic(topicName.toString());
             log.info("-- Exiting {} test --", methodName);
         }
     }
@@ -942,18 +914,17 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
     @Test
     public void testPartitionedTopicInterceptor() throws Exception {
         log.info("-- Starting {} test --", methodName);
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); 
// Creates new client connection
+        final String topicName = newTopicName();
+        PulsarClient pulsarClient = newPulsarClient();
 
         int numPartitions = 4;
-        TopicName topicName = TopicName
-                
.get("persistent://my-property/my-ns/interceptor-partitionedtopic1-" + 
System.currentTimeMillis());
 
-        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
 
         AtomicInteger newProducerPartitions = new AtomicInteger(0);
         AtomicInteger newConsumerPartitions = new AtomicInteger(0);
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false)
                 .autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, 
TimeUnit.SECONDS)
                 .intercept(new ProducerInterceptor<byte[]>() {
                     @Override
@@ -976,7 +947,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
                     }
                 
}).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-partitioned-subscriber").autoUpdatePartitionsInterval(1, 
TimeUnit.SECONDS)
                 .intercept(new ConsumerInterceptor<byte[]>() {
 
@@ -1015,7 +986,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
                 }).subscribe();
 
         int newPartitions = numPartitions + 5;
-        admin.topics().updatePartitionedTopic(topicName.toString(), 
newPartitions);
+        admin.topics().updatePartitionedTopic(topicName, newPartitions);
 
         Awaitility.await().atMost(10000, TimeUnit.SECONDS).until(
                 () -> newProducerPartitions.get() == newPartitions && 
newConsumerPartitions.get() == newPartitions);
@@ -1029,11 +1000,20 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.unsubscribe();
         consumer.close();
         pulsarClient.close();
-        admin.topics().deletePartitionedTopic(topicName.toString());
 
         log.info("-- Exiting {} test --", methodName);
     }
 
+    private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T 
receivedMessage,
+            T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
     private static class RouterWithTopicName implements MessageRouter {
         static String topicName = null;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
index 8e39660369b..bccae594768 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
@@ -38,6 +38,8 @@ public class PersistentTopicTerminateTest extends 
SharedPulsarBaseTest {
     public void testRecoverAfterTerminate() throws Exception {
         final String topicName = newTopicName();
         final String subscriptionName = "s1";
+        // Disable dedup so that the pulsar.dedup cursor doesn't prevent 
ledger trimming.
+        admin.namespaces().setDeduplicationStatus(getNamespace(), false);
         admin.topics().createNonPartitionedTopic(topicName);
         admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
 
@@ -64,7 +66,8 @@ public class PersistentTopicTerminateTest extends 
SharedPulsarBaseTest {
         assertEquals(msg2.getValue(), "2");
 
         // Verify: the ledgers acked will be cleaned up.
-        consumer.acknowledgeCumulative(msg2);
+        consumer.close();
+        admin.topics().skipAllMessages(topicName, subscriptionName);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
             PersistentTopic persistentTopic =
                     (PersistentTopic) getTopic(topicName, false).join().get();
@@ -76,7 +79,6 @@ public class PersistentTopicTerminateTest extends 
SharedPulsarBaseTest {
         });
 
         // Cleanup.
-        consumer.close();
         admin.topics().delete(topicName, false);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index af4786a987f..f37608f98d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -30,45 +31,29 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.Data;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.reflect.Nullable;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
 @Test(groups = "broker-api")
-public class RetryTopicTest extends ProducerConsumerBase {
+public class RetryTopicTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(RetryTopicTest.class);
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test
     public void testRetryTopic() throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final String topic = newTopicName();
 
         final int maxRedeliveryCount = 2;
 
@@ -85,9 +70,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -141,7 +126,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test
     public void testRetryTopicWithProducerBuilder() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/retry-topic-with-producer-builder";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 2;
         final int sendMessages = 100;
 
@@ -166,7 +151,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic + "-" + subscriptionName + "-DLQ")
                 .subscriptionName(subscriptionNameDLQ)
@@ -227,7 +212,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
      */
     @Test
     public void testRetryTopicWithExclusiveMode() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/retry-topic-exclusive";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 2;
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
@@ -279,7 +264,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testAutoConsumeSchemaRetryLetter() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/retry-letter-topic";
+        final String topic = newTopicName();
         final String subName = "my-subscription";
         final String retrySubName = "my-subscription" + "-RETRY";
         final int sendMessages = 10;
@@ -379,7 +364,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 60000)
     public void testRetryTopicProperties() throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final String topic = newTopicName();
 
         byte[] key = "key".getBytes();
         byte[] orderingKey = "orderingKey".getBytes();
@@ -400,9 +385,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .subscribe();
 
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -514,11 +499,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
     //Issue 9327: do compatibility check in case of the default retry and dead 
letter topic name changed
     @Test
     public void testRetryTopicNameForCompatibility () throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final String topic = newTopicName();
 
-        final String oldRetryTopic = 
"persistent://my-property/my-ns/my-subscription-RETRY";
+        final String oldRetryTopic = "persistent://" + getNamespace() + 
"/my-subscription-RETRY";
 
-        final String oldDeadLetterTopic = 
"persistent://my-property/my-ns/my-subscription-DLQ";
+        final String oldDeadLetterTopic = "persistent://" + getNamespace() + 
"/my-subscription-DLQ";
 
         final int maxRedeliveryCount = 2;
 
@@ -537,7 +522,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
                 .topic(oldDeadLetterTopic)
                 .subscriptionName("my-subscription")
@@ -598,28 +584,39 @@ public class RetryTopicTest extends ProducerConsumerBase {
      */
     @Test
     public void testRetryTopicWithMultiTopic() throws Exception {
-        final String topic1 = "persistent://my-property/my-ns/retry-topic-1";
-        final String topic2 = "persistent://my-property/my-ns/retry-topic-2";
+        final String topic1 = newTopicName();
+        final String topic2 = newTopicName();
 
         final int maxRedeliveryCount = 2;
 
-        int sendMessages = 100;
+        final int sendMessages = 100;
+        final int totalMessages = sendMessages * 2;
+        final String subscriptionName = "my-subscription";
+        // Use an explicit DLQ topic name since the auto-generated name 
depends on the
+        // alphabetical ordering of the topics in the TreeSet, which is 
unpredictable
+        // with random topic names.
+        final String dlqTopic = newTopicName() + "-DLQ";
 
         // subscribe to the original topics before publish
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic1, topic2)
-                .subscriptionName("my-subscription")
+                .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 .enableRetry(true)
-                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .deadLetterTopic(dlqTopic)
+                        .build())
                 .receiverQueueSize(100)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
-        // subscribe to the DLQ topics before consuming original topics
+        // subscribe to the DLQ topic before consuming original topics.
+        @Cleanup
         Consumer<byte[]> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ")
-                .subscriptionName("my-subscription")
+                .topic(dlqTopic)
+                .subscriptionName(subscriptionName)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
@@ -636,34 +633,34 @@ public class RetryTopicTest extends ProducerConsumerBase {
             producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
         }
 
-        sendMessages = sendMessages * 2;
-
         producer1.close();
         producer2.close();
 
         int totalReceived = 0;
         do {
-            Message<byte[]> message = consumer.receive();
+            Message<byte[]> message = consumer.receive(30, TimeUnit.SECONDS);
+            assertNotNull(message, "Expected more messages, received " + 
totalReceived);
             log.info("consumer received message : {} {} - total = {}",
                 message.getMessageId(), new String(message.getData()), 
++totalReceived);
             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
-        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+        } while (totalReceived < totalMessages * (maxRedeliveryCount + 1));
 
         int totalInDeadLetter = 0;
         do {
-            Message message = deadLetterConsumer.receive();
+            Message message = deadLetterConsumer.receive(30, TimeUnit.SECONDS);
+            assertNotNull(message, "Expected more DLQ messages, received " + 
totalInDeadLetter);
             log.info("dead letter consumer received message : {} {}", 
message.getMessageId(),
                     new String(message.getData()));
             deadLetterConsumer.acknowledge(message);
             totalInDeadLetter++;
-        } while (totalInDeadLetter < sendMessages);
+        } while (totalInDeadLetter < totalMessages);
 
-        deadLetterConsumer.close();
         consumer.close();
 
+        @Cleanup
         Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic1, topic2)
-                .subscriptionName("my-subscription")
+                .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -674,16 +671,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
                     new String(checkMessage.getData()));
         }
         assertNull(checkMessage);
-
-        checkConsumer.close();
     }
 
     @Test
     public void testRetryTopicByCustomTopicName() throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final String topic = newTopicName();
         final int maxRedeliveryCount = 2;
         final int sendMessages = 100;
 
+        final String customRetryTopic = "persistent://" + getNamespace() + 
"/my-subscription-custom-Retry";
+
         // subscribe before publish
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
@@ -693,14 +690,14 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .receiverQueueSize(100)
                 .deadLetterPolicy(DeadLetterPolicy.builder()
                         .maxRedeliverCount(maxRedeliveryCount)
-                        
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .retryLetterTopic(customRetryTopic)
                         .build())
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
         @Cleanup
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
-                
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+                .topic(topic + "-my-subscription-DLQ")
                 .subscriptionName("my-subscription")
                 .subscribe();
 
@@ -730,7 +727,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
         deadLetterConsumer.close();
         consumer.close();
         @Cleanup
-        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        PulsarClient newPulsarClient1 = newPulsarClient();
         Consumer<byte[]> checkConsumer = 
newPulsarClient1.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -749,8 +746,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000L)
     public void testRetryTopicException() throws Exception {
-        String retryLetterTopic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
+        final String topic = newTopicName();
+        String retryLetterTopic = topic + "-RETRY";
         final int maxRedeliveryCount = 2;
         final int sendMessages = 1;
         // subscribe before publish
@@ -791,7 +788,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000L)
     public void testRetryProducerWillCloseByConsumer() throws Exception {
-        final String topicName = "persistent://my-property/my-ns/tp_" + 
UUID.randomUUID().toString();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub1";
         final String topicRetry = topicName + "-" + subscriptionName + 
"-RETRY";
         final String topicDLQ = topicName + "-" + subscriptionName + "-DLQ";
@@ -838,8 +835,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000L)
     public void testRetryTopicExceptionWithConcurrent() throws Exception {
-        String retryLetterTopic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
+        final String topic = newTopicName();
+        String retryLetterTopic = topic + "-RETRY";
         final int maxRedeliveryCount = 2;
         final int sendMessages = 10;
         // subscribe before publish
@@ -922,18 +919,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
     // but for retry topic
     @Test
     public void 
testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws 
Exception {
-        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
-        admin.namespaces().createNamespace(namespace);
         // don't enforce schema validation
-        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        admin.namespaces().setSchemaValidationEnforced(getNamespace(), false);
         // set schema compatibility strategy to always compatible
-        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        admin.namespaces()
+                .setSchemaCompatibilityStrategy(getNamespace(), 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
 
         Schema<Payload> schema = Schema.AVRO(Payload.class);
         Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
                 PayloadIncompatible.class);
-        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
-                + 
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+        String topic = newTopicName();
         String dlqTopic = topic + "-DLQ";
         String retryTopic = topic + "-RETRY";
 
@@ -968,7 +963,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
             Thread.sleep(2000L);
 
-            
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+            
assertThat(getTopicReference(retryTopic).get().getProducers().size())
                     .describedAs("producer count of retry topic %s should be 
<= 1 so that it doesn't leak producers",
                             retryTopic)
                     .isLessThanOrEqualTo(1);
@@ -983,7 +978,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
             }
         }
 
-        
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+        assertThat(getTopicReference(retryTopic).get().getProducers().size())
                 .describedAs("producer count of retry topic %s should be 0 
here",
                         retryTopic)
                 .isEqualTo(0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6c133d6c109..6e872627fb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -42,6 +43,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -55,26 +58,19 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")
-public class TopicReaderTest extends ProducerConsumerBase {
+public class TopicReaderTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(TopicReaderTest.class);
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
+    protected String methodName;
 
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
+    @BeforeMethod(alwaysRun = true)
+    public void setTestMethodName(Method m) {
+        methodName = m.getName();
     }
 
     @DataProvider
@@ -115,10 +111,11 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testSimpleReader() throws Exception {
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
+        final String topicName = newTopicName();
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader")
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -143,7 +140,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testSimpleMultiReader() throws Exception {
-        String topic = "persistent://my-property/my-ns/testSimpleMultiReader";
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
 
         Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
@@ -170,8 +167,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testReaderAfterMessagesWerePublished() throws Exception {
+        final String topicName = newTopicName();
         Producer<byte[]> producer =
-                
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
+                pulsarClient.newProducer().topic(topicName)
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -179,7 +177,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
+                .topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -200,7 +198,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMultiReaderAfterMessagesWerePublished() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished";
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
                 .create();
@@ -227,8 +225,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMultipleReaders() throws Exception {
+        final String topicName = newTopicName();
         Producer<byte[]> producer =
-                
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders")
+                pulsarClient.newProducer().topic(topicName)
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -236,11 +235,11 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         }
 
         Reader<byte[]> reader1 =
-                
pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
+                pulsarClient.newReader().topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
         Reader<byte[]> reader2 =
-                
pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
+                pulsarClient.newReader().topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -271,7 +270,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMultiMultipleReaders() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/testMultiMultipleReaders";
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
                 .create();
@@ -309,7 +308,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testTopicStats() throws Exception {
-        String topicName = "persistent://my-property/my-ns/testTopicStats";
+        String topicName = newTopicName();
 
         Reader<byte[]> reader1 = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
@@ -330,7 +329,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMultiTopicStats() throws Exception {
-        String topicName = 
"persistent://my-property/my-ns/testMultiTopicStats";
+        String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
 
         Reader<byte[]> reader1 = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
@@ -352,7 +351,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test(dataProvider = "variationsForResetOnLatestMsg")
     public void testReaderOnLatestMessage(boolean startInclusive, int 
numOfMessages) throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderOnLatestMessage";
+        final String topicName = newTopicName();
         final int halfOfMsgs = numOfMessages / 2;
 
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -397,8 +396,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test(dataProvider = "variationsForResetOnLatestMsg")
     public void testMultiReaderOnLatestMessage(boolean startInclusive, int 
numOfMessages) throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/testMultiReaderOnLatestMessage"
-                + System.currentTimeMillis();
+        final String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
         final int halfOfMsgs = numOfMessages / 2;
 
@@ -446,8 +444,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testReaderOnSpecificMessage() throws Exception {
+        final String topicName = newTopicName();
         Producer<byte[]> producer =
-                
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
+                pulsarClient.newProducer().topic(topicName)
                 .create();
         List<MessageId> messageIds = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
@@ -456,7 +455,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         Reader<byte[]> reader =
-                
pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
+                pulsarClient.newReader().topic(topicName)
                 .startMessageId(messageIds.get(4)).create();
 
         // Publish more messages and verify the readers only sees messages 
starting from the intended message
@@ -478,8 +477,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testReaderOnSpecificMessageWithBatches() throws Exception {
+        final String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+                .topic(topicName)
                 .enableBatching(true).batchingMaxPublishDelay(100, 
TimeUnit.MILLISECONDS).create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -489,7 +489,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         // Write one sync message to ensure everything before got persistend
         producer.send("my-message-10".getBytes());
         Reader<byte[]> reader1 = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+                .topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
         MessageId lastMessageId = null;
@@ -503,7 +503,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
 
         Reader<byte[]> reader2 = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+                .topic(topicName)
                 .startMessageId(lastMessageId).create();
 
         for (int i = 5; i < 11; i++) {
@@ -561,13 +561,14 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
         final int totalMsg = 10;
 
+        final String topicName = newTopicName();
         Set<String> messageSet = new HashSet<>();
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
+                .topic(topicName).startMessageId(MessageId.latest)
                 .cryptoKeyReader(new EncKeyReader()).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1")
+                .topic(topicName)
                 .addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
@@ -632,7 +633,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         final int totalMsg = 10;
 
         Set<String> messageSet = new HashSet<>();
-        String topic = 
"persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1";
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(topic).startMessageId(MessageId.latest)
@@ -659,8 +660,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testDefaultCryptoKeyReader() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/test-reader-default-crypto-key-reader"
-                + System.currentTimeMillis();
+        final String topic = newTopicName();
         final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
         final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
         final String ecdsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlI"
@@ -789,11 +789,12 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testSimpleReaderReachEndOfTopic() throws Exception {
+        final String topicName = newTopicName();
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
+                .topic(topicName)
                 .startMessageId(MessageId.earliest).create();
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
+                .topic(topicName)
                 .create();
 
         // no data write, should return false
@@ -847,7 +848,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testSimpleMultiReaderReachEndOfTopic() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic";
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         Reader<byte[]> reader = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
@@ -901,12 +902,13 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderReachEndOfTopicOnMessageWithBatches() throws 
Exception {
+        final String topicName = newTopicName();
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+                .topic(topicName)
                 .startMessageId(MessageId.earliest).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+                .topic(topicName)
                 .enableBatching(true).batchingMaxPublishDelay(100, 
TimeUnit.MILLISECONDS).create();
 
         // no data write, should return false
@@ -944,7 +946,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMultiReaderReachEndOfTopicOnMessageWithBatches() throws 
Exception {
-        String topic = 
"persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches";
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(topic)
@@ -989,7 +991,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testMessageAvailableAfterRestart() throws Exception {
-        String topic = 
"persistent://my-property/my-ns/testMessageAvailableAfterRestart";
+        String topic = newTopicName();
         String content = "my-message-1";
 
         // stop retention from cleaning up
@@ -1010,7 +1012,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         }
 
         // cause broker to drop topic. Will be loaded next time we access it
-        
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
+        getTopicReference(topic).get().close(false).get();
 
         try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
                 .startMessageId(MessageId.earliest).create()) {
@@ -1025,7 +1027,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testMultiReaderMessageAvailableAfterRestart() throws Exception 
{
-        String topic = 
"persistent://my-property/my-ns/testMessageAvailableAfterRestart2";
+        String topic = newTopicName();
         String content = "my-message-1";
         admin.topics().createPartitionedTopic(topic, 3);
         // stop retention from cleaning up
@@ -1046,9 +1048,9 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         }
 
         // cause broker to drop topic. Will be loaded next time we access it
-        pulsar.getBrokerService().getTopics().keySet().forEach(topicName -> {
+        
SharedPulsarCluster.get().getPulsarService().getBrokerService().getTopics().keySet().forEach(topicName
 -> {
             try {
-                
pulsar.getBrokerService().getTopicReference(topicName).get().close(false).get();
+                getTopicReference(topicName).get().close(false).get();
             } catch (Exception e) {
                 fail();
             }
@@ -1067,7 +1069,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test(dataProvider = "variationsForHasMessageAvailable")
     public void testHasMessageAvailable(boolean enableBatch, boolean 
startInclusive) throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/HasMessageAvailable";
+        final String topicName = newTopicName();
         final int numOfMessage = 100;
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -1128,7 +1130,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test(timeOut = 20000)
     public void testHasMessageAvailable() throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
+        final String topicName = newTopicName();
         final int numOfMessage = 10;
 
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1205,7 +1207,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     @Test
     public void testReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
         final int numOfMessage = 10;
-        final String topicName = 
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
+        final String topicName = newTopicName();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topicName).create();
@@ -1229,7 +1231,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     @Test
     public void testMultiReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
         final int numOfMessage = 10;
-        final String topicName = 
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
+        final String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 3);
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
@@ -1251,7 +1253,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws 
Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+        final String topicName = newTopicName();
         final int numOfMessage = 10;
 
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1300,7 +1302,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testMultiReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws 
Exception {
-        final String topicName = 
"persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic";
+        final String topicName = newTopicName();
         final int numOfMessage = 10;
         admin.topics().createPartitionedTopic(topicName, 3);
 
@@ -1346,7 +1348,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws 
Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
+        final String topicName = newTopicName();
         final int numOfMessage = 100;
         final int halfMessages = numOfMessage / 2;
 
@@ -1401,7 +1403,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws 
Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
+        final String topicName = newTopicName();
         final int numOfMessage = 10;
         final int halfMessages = numOfMessage / 2;
 
@@ -1434,8 +1436,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws 
Exception {
-        final String topicName = 
"persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic"
-                + System.currentTimeMillis();
+        final String topicName = newTopicName();
         final int numOfMessage = 10;
         final int halfMessages = numOfMessage / 2;
         admin.topics().createPartitionedTopic(topicName, 3);
@@ -1464,7 +1465,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     @Test(dataProvider = "variationsForExpectedPos")
     public void testReaderStartMessageIdAtExpectedPos(boolean batching, 
boolean startInclusive, int numOfMessages)
             throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
+        final String topicName = newTopicName();
         final int resetIndex = new Random().nextInt(numOfMessages); // Choose 
some random index to reset
         final int firstMessage = startInclusive ? resetIndex : resetIndex + 1; 
// First message of reset
 
@@ -1526,7 +1527,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderBuilderConcurrentCreate() throws Exception {
-        String topicName = 
"persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_";
+        String topicName = newTopicName() + "-";
         int numTopic = 30;
         ReaderBuilder<byte[]> builder = 
pulsarClient.newReader().startMessageId(MessageId.earliest);
 
@@ -1554,7 +1555,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test(timeOut = 10000)
     public void testMultiReaderBuilderConcurrentCreate() throws Exception {
-        String topicName = 
"persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_";
+        String topicName = newTopicName() + "-";
         int numTopic = 30;
         ReaderBuilder<byte[]> builder = 
pulsarClient.newReader().startMessageId(MessageId.earliest);
 
@@ -1583,7 +1584,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderStartInMiddleOfBatch() throws Exception {
-        final String topicName = 
"persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
+        final String topicName = newTopicName();
         final int numOfMessage = 100;
 
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1674,4 +1675,11 @@ public class TopicReaderTest extends 
ProducerConsumerBase {
         assertTrue(r2Inclusive.hasMessageAvailable());
         assertTrue(r3.hasMessageAvailable());
     }
+
+    private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T 
receivedMessage,
+            T expectedMessage) {
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
index fa8b3a7fcf0..ae321c4b2be 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
@@ -28,43 +28,25 @@ import java.nio.file.Paths;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 @Slf4j
 @Test(groups = "broker-api")
-public class ConsumerDecryptFailListenerTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerDecryptFailListenerTest extends SharedPulsarBaseTest {
 
     @Test(timeOut = 10000)
     public void testDecryptFailListenerException() {
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://my-property/my-ns/testReaderBuildExceptionsWithSetReaderDecryptFailure"
-        );
+        final String topic = newTopicName();
         // should throw exception if decryptFailListener is set without 
setting a messageListener
         assertThatThrownBy(
                 () -> pulsarClient.newConsumer().topic(topic)
@@ -93,9 +75,7 @@ public class ConsumerDecryptFailListenerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testDecryptFailListenerBehaviorWithConsumerImpl() throws 
Exception {
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
-        );
+        final String topic = newTopicName();
         admin.topics().createNonPartitionedTopic(topic);
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topic)
                 .decryptFailListener(((reader, msg) -> {
@@ -125,9 +105,7 @@ public class ConsumerDecryptFailListenerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 20000)
     public void testDecryptFailListenerBehaviorWithMultiConsumerImpl() throws 
Exception {
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithMultiConsumerImpl"
-        );
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 3);
         MultiTopicsConsumerImpl<byte[]> consumer1 = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
                 .topic(topic)
@@ -159,9 +137,7 @@ public class ConsumerDecryptFailListenerTest extends 
ProducerConsumerBase {
 
     @Test(timeOut = 30000)
     public void testDecryptFailListenerReceiveMessage() throws Exception {
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://my-property/my-ns/testDecryptFailListenerReceiveMessage"
-        );
+        final String topic = newTopicName();
         admin.topics().createNonPartitionedTopic(topic);
         int totalMessages = 10;
         CountDownLatch countDownLatch = new CountDownLatch(10);
@@ -199,9 +175,7 @@ public class ConsumerDecryptFailListenerTest extends 
ProducerConsumerBase {
      */
     @Test(timeOut = 30000)
     public void testBothDecryptFailListenerAndMessageListenerReceiveMessage() 
throws Exception {
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://my-property/my-ns/testBothDecryptFailListenerAndMessageListenerReceiveMessage"
-        );
+        final String topic = newTopicName();
         int totalMessages = 10;
         CountDownLatch decryptSuccessCount = new CountDownLatch(5);
         CountDownLatch decryptFailCount = new CountDownLatch(5);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 203715ca7db..ff31409a241 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
@@ -43,39 +44,23 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-impl")
-public class MessageChunkingSharedTest extends ProducerConsumerBase {
+public class MessageChunkingSharedTest extends SharedPulsarBaseTest {
 
     private static final int MAX_MESSAGE_SIZE = 100;
 
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test
     public void testSingleConsumer() throws Exception {
-        final String topic = "my-property/my-ns/test-single-consumer";
+        final String topic = newTopicName();
         @Cleanup final Producer<String> producer = createProducer(topic);
         @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -108,7 +93,7 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
 
     @Test
     public void testMultiConsumers() throws Exception {
-        final String topic = "my-property/my-ns/test-multi-consumers";
+        final String topic = newTopicName();
         @Cleanup final Producer<String> producer = createProducer(topic);
         final ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -148,7 +133,7 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
 
     @Test
     public void testInterleavedChunks() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/test-interleaved-chunks";
+        final String topic = newTopicName();
         final ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionName("sub")
@@ -158,8 +143,7 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
                         (consumer, msg) -> 
receivedUuidList1.add(msg.getProducerName() + "-" + msg.getSequenceId()))
                 .consumerName("consumer-1")
                 .subscribe();
-        final PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
-                .getTopicIfExists(topic).get().orElse(null);
+        final PersistentTopic persistentTopic = (PersistentTopic) 
getTopicIfExists(topic).get().orElse(null);
         assertNotNull(persistentTopic);
         // send: A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2
         sendNonChunk(persistentTopic, "A", 0);
@@ -196,7 +180,7 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
     // Issue #25220
     @Test
     public void testNegativeAckChunkedMessage() throws Exception {
-        final String topic = 
"persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
+        final String topic = newTopicName();
 
         @Cleanup
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
index a458d710697..5eb5cc050b0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
@@ -25,36 +25,21 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.TopicMetadata;
 import 
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-impl")
-public class PartialPartitionedProducerTest extends ProducerConsumerBase {
-    @Override
-    @BeforeClass
-    public void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @Override
-    @AfterClass(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class PartialPartitionedProducerTest extends SharedPulsarBaseTest {
 
     @Test
     public void testPtWithSinglePartition() throws Throwable {
-        final String topic = 
BrokerTestUtil.newUniqueName("pt-with-single-routing");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 10);
 
         @Cleanup
@@ -74,7 +59,7 @@ public class PartialPartitionedProducerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testPtWithPartialPartition() throws Throwable {
-        final String topic = 
BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 10);
 
         @Cleanup
@@ -96,7 +81,7 @@ public class PartialPartitionedProducerTest extends 
ProducerConsumerBase {
     // AddPartitionTest
     @Test
     public void testPtLazyLoading() throws Throwable {
-        final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 10);
 
         @Cleanup
@@ -128,7 +113,7 @@ public class PartialPartitionedProducerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testPtLoadingNotSharedMode() throws Throwable {
-        final String topic = 
BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 10);
 
         @Cleanup
@@ -162,7 +147,7 @@ public class PartialPartitionedProducerTest extends 
ProducerConsumerBase {
     // AddPartitionAndLimitTest
     @Test
     public void testPtUpdateWithPartialPartition() throws Throwable {
-        final String topic = 
BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 2);
 
         final Field field = 
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
@@ -214,7 +199,7 @@ public class PartialPartitionedProducerTest extends 
ProducerConsumerBase {
 
     @Test
     public void testPtUpdateNotSharedMode() throws Throwable {
-        final String topic = 
BrokerTestUtil.newUniqueName("pt-update-not-shared");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 2);
 
         final Field field = 
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
index e18dc372f8c..205211bff73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
@@ -31,12 +31,11 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
@@ -46,36 +45,17 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.MockedStatic;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
-public class ProducerMemoryLeakTest extends ProducerConsumerBase {
+public class ProducerMemoryLeakTest extends SharedPulsarBaseTest {
 
-    private static final String NAMESPACE_NEVER_COMPATIBLE = 
"public/schema-never-compatible";
-
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-        admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE);
-        
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE,
-                SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
 
     @Test
     public void testSendQueueIsFull() throws Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING)
                 .blockIfQueueFull(false).maxPendingMessages(1)
@@ -126,7 +106,7 @@ public class ProducerMemoryLeakTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "maxMessageSizeAndCompressions")
     public void testSendMessageSizeExceeded(int maxMessageSize, 
CompressionType compressionType) throws Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
                 .compressionType(compressionType)
@@ -205,7 +185,7 @@ public class ProducerMemoryLeakTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "maxMessageSizes")
     public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws 
Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
                 .enableBatching(true)
@@ -249,7 +229,7 @@ public class ProducerMemoryLeakTest extends 
ProducerConsumerBase {
 
     @Test
     public void testSendAfterClosedProducer() throws Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl<String> producer =
                 (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
@@ -278,8 +258,9 @@ public class ProducerMemoryLeakTest extends 
ProducerConsumerBase {
 
     @Test
     public void testBrokenSchema() throws Exception {
-        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ NAMESPACE_NEVER_COMPATIBLE
-                + "/tp");
+        admin.namespaces().setSchemaCompatibilityStrategy(getNamespace(),
+                SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl producer =
                 (ProducerImpl) 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
@@ -332,7 +313,7 @@ public class ProducerMemoryLeakTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "failedInterceptAt")
     public void testInterceptorError(String method) throws Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+        final String topicName = newTopicName();
         admin.topics().createNonPartitionedTopic(topicName);
         ProducerImpl<String> producer = (ProducerImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topicName)
                 .intercept(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
index 17c4271b873..17fab63189d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
@@ -21,56 +21,50 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.reflect.Method;
 import java.net.ServerSocket;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class PulsarMultiHostClientTest extends ProducerConsumerBase {
+public class PulsarMultiHostClientTest extends SharedPulsarBaseTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarMultiHostClientTest.class);
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
+    protected String methodName;
 
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
+    @BeforeMethod(alwaysRun = true)
+    public void setTestMethodName(Method m) {
+        methodName = m.getName();
     }
 
     @Test
     public void testGetPartitionedTopicMetaData() {
         log.info("-- Starting {} test --", methodName);
 
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        final String topicName = newTopicName();
         final String subscriptionName = "my-subscriber-name";
 
         try {
-            String url = pulsar.getWebServiceAddress();
-            if (isTcpLookup) {
-                url = pulsar.getBrokerServiceUrl();
-            }
             @Cleanup
-            PulsarClient client = newPulsarClient(url, 0);
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(getWebServiceUrl())
+                    .statsInterval(0, TimeUnit.SECONDS)
+                    .build();
 
             Consumer<byte[]> consumer = 
client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -90,7 +84,7 @@ public class PulsarMultiHostClientTest extends 
ProducerConsumerBase {
     public void testGetPartitionedTopicDataTimeout() {
         log.info("-- Starting {} test --", methodName);
 
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        final String topicName = newTopicName();
 
         String url = "http://localhost:"; + getFreePort() + ",localhost:" + 
getFreePort();
 
@@ -124,17 +118,17 @@ public class PulsarMultiHostClientTest extends 
ProducerConsumerBase {
     public void testMultiHostUrlRetrySuccess() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        final String topicName = newTopicName();
         final String subscriptionName = "my-subscriber-name";
 
         // Multi hosts included an unreached port and the actual port for 
verify retry logic
         String urlsWithUnreached = "http://localhost:51000,localhost:";
-                + new URI(pulsar.getWebServiceAddress()).getPort();
-        if (isTcpLookup) {
-            urlsWithUnreached = "pulsar://localhost:51000,localhost" + new 
URI(pulsar.getBrokerServiceUrl()).getPort();
-        }
+                + new URI(getWebServiceUrl()).getPort();
         @Cleanup
-        PulsarClient client = newPulsarClient(urlsWithUnreached, 0);
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(urlsWithUnreached)
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
 
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
             .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -153,7 +147,8 @@ public class PulsarMultiHostClientTest extends 
ProducerConsumerBase {
             String receivedMessage = new String(msg.getData());
             log.info("Received message: [{}]", receivedMessage);
             String expectedMessage = "my-message-" + i;
-            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            Assert.assertEquals(receivedMessage, expectedMessage);
+            Assert.assertTrue(messageSet.add(receivedMessage), "Duplicate 
message: " + receivedMessage);
         }
 
         // Acknowledge the consumption of all messages at once

Reply via email to