This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 765c46e2747 [cleanup] Convert 10 test classes to SharedPulsarBaseTest
(#25338)
765c46e2747 is described below
commit 765c46e274782cea3001430fce56f5862030abf2
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Mar 21 14:14:08 2026 -0700
[cleanup] Convert 10 test classes to SharedPulsarBaseTest (#25338)
Co-authored-by: Lari Hotari <[email protected]>
---
.../broker/service/ConsumedLedgersTrimTest.java | 7 +-
.../service/CurrentLedgerRolloverIfFullTest.java | 2 +
.../pulsar/broker/service/SharedPulsarCluster.java | 1 +
.../pulsar/client/api/ClientDeduplicationTest.java | 69 +++----
.../apache/pulsar/client/api/InterceptorsTest.java | 101 +++++-----
.../api/PartitionedProducerConsumerTest.java | 216 ++++++++++-----------
.../client/api/PersistentTopicTerminateTest.java | 6 +-
.../apache/pulsar/client/api/RetryTopicTest.java | 131 ++++++-------
.../apache/pulsar/client/api/TopicReaderTest.java | 134 +++++++------
.../impl/ConsumerDecryptFailListenerTest.java | 40 +---
.../client/impl/MessageChunkingSharedTest.java | 30 +--
.../impl/PartialPartitionedProducerTest.java | 31 +--
.../pulsar/client/impl/ProducerMemoryLeakTest.java | 39 +---
.../client/impl/PulsarMultiHostClientTest.java | 49 +++--
14 files changed, 376 insertions(+), 480 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 7d673decd2a..d789948fcd5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -50,6 +50,8 @@ public class ConsumedLedgersTrimTest extends
SharedPulsarBaseTest {
@Test
public void testConsumedLedgersTrim() throws Exception {
+ // Disable dedup so the pulsar.dedup cursor doesn't block ledger
trimming
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
// Set infinite retention at namespace level so ledgers are preserved
until explicitly trimmed
admin.namespaces().setRetention(getNamespace(), new
RetentionPolicies(-1, -1));
@@ -106,6 +108,8 @@ public class ConsumedLedgersTrimTest extends
SharedPulsarBaseTest {
@Test
public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
+ // Disable dedup so the pulsar.dedup cursor doesn't block ledger
trimming
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
// Set infinite retention at namespace level so ledgers are preserved
until explicitly trimmed
admin.namespaces().setRetention(getNamespace(), new
RetentionPolicies(-1, -1));
@@ -178,6 +182,8 @@ public class ConsumedLedgersTrimTest extends
SharedPulsarBaseTest {
@Test
public void testAdminTrimLedgers() throws Exception {
+ // Disable dedup so the pulsar.dedup cursor doesn't block ledger
trimming
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
final String subscriptionName = "my-sub";
final int maxEntriesPerLedger = 2;
final int partitionedNum = 3;
@@ -230,7 +236,6 @@ public class ConsumedLedgersTrimTest extends
SharedPulsarBaseTest {
"Expected at most 2 ledgers after trim, but found "
+ managedLedger.getLedgersInfoAsList().size());
});
-
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 226456df597..be23800e6d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -37,6 +37,8 @@ public class CurrentLedgerRolloverIfFullTest extends
SharedPulsarBaseTest {
@Test
public void testCurrentLedgerRolloverIfFull() throws Exception {
+ // Disable dedup so the pulsar.dedup cursor doesn't block ledger
trimming
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
final String topicName = newTopicName();
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index d67c8194edf..a561712b8ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -146,6 +146,7 @@ public class SharedPulsarCluster {
config.setForceDeleteNamespaceAllowed(true);
config.setForceDeleteTenantAllowed(true);
config.setBrokerDeleteInactiveTopicsEnabled(false);
+ config.setBrokerDeduplicationEnabled(true);
// Reduce thread pool sizes for faster startup (fewer threads to
create)
config.setNumIOThreads(2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index 4e96252056d..aaaca5a8bba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -34,18 +34,17 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class ClientDeduplicationTest extends ProducerConsumerBase {
+public class ClientDeduplicationTest extends SharedPulsarBaseTest {
@DataProvider
public static Object[][] batchingTypes() {
@@ -55,22 +54,9 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
};
}
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test(priority = -1)
public void testNamespaceDeduplicationApi() throws Exception {
- final String namespace = "my-property/my-ns";
+ final String namespace = getNamespace();
assertNull(admin.namespaces().getDeduplicationStatus(namespace));
admin.namespaces().setDeduplicationStatus(namespace, true);
Awaitility.await().untilAsserted(() ->
assertTrue(admin.namespaces().getDeduplicationStatus(namespace)));
@@ -82,8 +68,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test
public void testProducerSequenceAfterReconnect() throws Exception {
- final String topic =
"persistent://my-property/my-ns/testProducerSequenceAfterReconnect";
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ final String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topic)
.producerName("my-producer-name");
@@ -113,8 +99,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test
public void testProducerSequenceAfterRestart() throws Exception {
- String topic =
"persistent://my-property/my-ns/testProducerSequenceAfterRestart";
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topic)
.producerName("my-producer-name");
@@ -130,8 +116,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
producer.close();
- // Kill and restart broker
- restartBroker();
+ // Unload topic to force reload of dedup state
+ admin.topics().unload(topic);
producer = producerBuilder.create();
assertEquals(producer.getLastSequenceId(), 9L);
@@ -147,8 +133,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testProducerDeduplication() throws Exception {
- String topic =
"persistent://my-property/my-ns/testProducerDeduplication";
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
// Set infinite timeout
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topic)
@@ -180,8 +166,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNull(msg);
- // Kill and restart broker
- restartBroker();
+ // Unload topic to force reload of dedup state
+ admin.topics().unload(topic);
producer = producerBuilder.create();
assertEquals(producer.getLastSequenceId(), 2L);
@@ -198,9 +184,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test(timeOut = 30000, dataProvider = "batchingTypes")
public void
testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder
batcherBuilder) throws Exception {
- String topic =
"persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
- + System.currentTimeMillis();
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
// Set infinite timeout
ProducerBuilder<byte[]> producerBuilder =
@@ -244,8 +229,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
assertNull(msg);
producer.close();
- // Kill and restart broker
- restartBroker();
+ // Unload topic to force reload of dedup state
+ admin.topics().unload(topic);
producer = producerBuilder.create();
assertEquals(producer.getLastSequenceId(), 6L);
@@ -263,8 +248,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testProducerDeduplicationNonBatchAsync() throws Exception {
- String topic =
"persistent://my-property/my-ns/testProducerDeduplicationNonBatchAsync";
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
// Set infinite timeout
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topic)
@@ -295,8 +280,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNull(msg);
- // Kill and restart broker
- restartBroker();
+ // Unload topic to force reload of dedup state
+ admin.topics().unload(topic);
producer = producerBuilder.create();
assertEquals(producer.getLastSequenceId(), 5L);
@@ -313,8 +298,8 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testKeyBasedBatchingOrder() throws Exception {
- final String topic =
"persistent://my-property/my-ns/test-key-based-batching-order";
- admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);
+ final String topic = newTopicName();
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -379,13 +364,13 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
@Test
public void testUpdateSequenceIdInSyncCodeSegment() throws Exception {
- final String topic =
"persistent://my-property/my-ns/testUpdateSequenceIdInSyncCodeSegment";
+ final String topic = newTopicName();
int totalMessage = 200;
int threadSize = 5;
- String topicName = "subscription";
+ String subscriptionName = "subscription";
@Cleanup("shutdownNow")
ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
- conf.setBrokerDeduplicationEnabled(true);
+ admin.namespaces().setDeduplicationStatus(getNamespace(), true);
//build producer/consumer
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -397,7 +382,7 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName(topicName)
+ .subscriptionName(subscriptionName)
.subscribe();
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index 19665de29a1..bb2cdeb0309 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.api;
-import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -33,8 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
@@ -45,29 +43,14 @@ import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class InterceptorsTest extends ProducerConsumerBase {
+public class InterceptorsTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(InterceptorsTest.class);
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@DataProvider(name = "receiverQueueSize")
public Object[][] getReceiverQueueSize() {
return new Object[][] { { 0 }, { 1000 } };
@@ -81,18 +64,16 @@ public class InterceptorsTest extends ProducerConsumerBase {
return new Object[][] {{ 0 }, { 3 }};
}
- @DataProvider(name = "topics")
- public Object[][] getTopics() {
- return new Object[][] {{
List.of("persistent://my-property/my-ns/my-topic") },
- { List.of("persistent://my-property/my-ns/my-topic",
"persistent://my-property/my-ns/my-topic1") }};
+ @DataProvider(name = "topicCount")
+ public Object[][] getTopicCount() {
+ return new Object[][] {{ 1 }, { 2 }};
}
@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
- String ns = "my-property/my-ns" + RandomUtils.nextInt(999, 1999);
- admin.namespaces().createNamespace(ns, Sets.newHashSet("test"));
+ String ns = getNamespace();
admin.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
abstract class BaseInterceptor implements
@@ -164,6 +145,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testProducerInterceptorsWithExceptions() throws
PulsarClientException {
+ final String topicName = newTopicName();
ProducerInterceptor<String> interceptor = new
ProducerInterceptor<String>() {
@Override
public void close() {
@@ -182,7 +164,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.intercept(interceptor)
.create();
@@ -193,6 +175,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testProducerInterceptorsWithErrors() throws
PulsarClientException {
+ final String topicName = newTopicName();
ProducerInterceptor<String> interceptor = new
ProducerInterceptor<String>() {
@Override
public void close() {
@@ -211,7 +194,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.intercept(interceptor)
.create();
@@ -222,6 +205,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testProducerInterceptorAccessMessageData() throws
PulsarClientException {
+ final String topicName = newTopicName();
List<String> messageDataInBeforeSend =
Collections.synchronizedList(new ArrayList<>());
List<String> messageDataOnSendAcknowledgement =
Collections.synchronizedList(new ArrayList<>());
ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() {
@@ -243,7 +227,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
};
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.intercept(interceptor)
.create();
@@ -264,6 +248,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testConsumerInterceptorWithErrors() throws
PulsarClientException {
+ final String topicName = newTopicName();
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
@Override
public void close() {
@@ -296,7 +281,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
};
Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic-exception")
+ .topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription-ack-timeout")
@@ -304,14 +289,14 @@ public class InterceptorsTest extends
ProducerConsumerBase {
.subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic-exception")
+ .topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription-negative")
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic-exception")
+ .topic(topicName)
.create();
producer.newMessage().value("Hello Pulsar!").send();
@@ -337,6 +322,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test(dataProvider = "receiverQueueSize")
public void testConsumerInterceptorWithSingleTopicSubscribe(Integer
receiverQueueSize) throws Exception {
+ final String topicName = newTopicName();
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
@Override
public void close() {
@@ -372,7 +358,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
};
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
@@ -380,7 +366,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.enableBatching(false)
.create();
@@ -413,7 +399,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
final CompletableFuture<Message<String>> future = new
CompletableFuture<>();
consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
@@ -446,6 +432,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testConsumerInterceptorWithMultiTopicSubscribe() throws
PulsarClientException {
+ final String topicName = newTopicName();
+ final String topicName1 = newTopicName();
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
@Override
@@ -482,15 +470,15 @@ public class InterceptorsTest extends
ProducerConsumerBase {
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.create();
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topicName1)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic",
"persistent://my-property/my-ns/my-topic1")
+ .topic(topicName, topicName1)
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
@@ -527,7 +515,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
AtomicInteger beforeConsumeCount = new AtomicInteger(0);
PulsarClient client = PulsarClient.builder()
- .serviceUrl(lookupUrl.toString())
+ .serviceUrl(getBrokerServiceUrl())
.listenerThreads(1)
.build();
@@ -560,7 +548,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
};
- final String topicName = "persistent://my-property/my-ns/my-topic";
+ final String topicName = newTopicName();
if (partitions > 0) {
admin.topics().createPartitionedTopic(topicName, partitions);
@@ -584,7 +572,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
.subscribe();
Producer<String> producer = client.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.create();
final int messages = 10;
@@ -602,6 +590,8 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testConsumerInterceptorWithPatternTopicSubscribe() throws
PulsarClientException {
+ final String topicName = newTopicName();
+ final String topicName1 = newTopicName();
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
@Override
@@ -638,15 +628,15 @@ public class InterceptorsTest extends
ProducerConsumerBase {
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.create();
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topicName1)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topicsPattern("persistent://my-property/my-ns/my-.*")
+ .topicsPattern("persistent://" + getNamespace() + "/.*")
.subscriptionType(SubscriptionType.Shared)
.intercept(interceptor)
.subscriptionName("my-subscription")
@@ -674,6 +664,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test
public void testConsumerInterceptorForAcknowledgeCumulative() throws
PulsarClientException {
+ final String topicName = newTopicName();
List<MessageId> ackHolder = new ArrayList<>();
@@ -715,14 +706,14 @@ public class InterceptorsTest extends
ProducerConsumerBase {
};
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.subscriptionName("my-subscription")
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic(topicName)
.create();
for (int i = 0; i < 100; i++) {
@@ -748,9 +739,13 @@ public class InterceptorsTest extends ProducerConsumerBase
{
consumer.close();
}
- @Test(dataProvider = "topics")
- public void testConsumerInterceptorForNegativeAcksSend(List<String> topics)
+ @Test(dataProvider = "topicCount")
+ public void testConsumerInterceptorForNegativeAcksSend(int topicCount)
throws PulsarClientException, InterruptedException {
+ List<String> topics = new ArrayList<>();
+ for (int i = 0; i < topicCount; i++) {
+ topics.add(newTopicName());
+ }
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
@@ -820,9 +815,13 @@ public class InterceptorsTest extends ProducerConsumerBase
{
consumer.close();
}
- @Test(dataProvider = "topics")
- public void testConsumerInterceptorForAckTimeoutSend(List<String> topics)
throws PulsarClientException,
+ @Test(dataProvider = "topicCount")
+ public void testConsumerInterceptorForAckTimeoutSend(int topicCount)
throws PulsarClientException,
InterruptedException {
+ List<String> topics = new ArrayList<>();
+ for (int i = 0; i < topicCount; i++) {
+ topics.add(newTopicName());
+ }
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
@@ -890,7 +889,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test(timeOut = 1000 * 30, dataProvider = "topicPartition")
public void testReaderInterceptor(int topicPartition) throws Exception {
- String topic = "reader-interceptor-" + topicPartition + "-" +
RandomStringUtils.randomAlphabetic(5);
+ String topic = newTopicName();
if (topicPartition > 0) {
admin.topics().createPartitionedTopic(topic, topicPartition);
}
@@ -999,7 +998,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
@Test(dataProvider = "topicPartition")
public void testConsumerInterceptorForOnArrive(int topicPartition) throws
PulsarClientException,
InterruptedException, PulsarAdminException {
- String topicName = "persistent://my-property/my-ns/on-arrive";
+ String topicName = newTopicName();
if (topicPartition > 0) {
admin.topics().createPartitionedTopic(topicName, topicPartition);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index ba4054ef700..a0e487bf3ad 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.util.Timeout;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -36,6 +37,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
@@ -47,48 +49,48 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
+public class PartitionedProducerConsumerTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(PartitionedProducerConsumerTest.class);
private ExecutorService executor;
+ protected String methodName;
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
+ @BeforeMethod(alwaysRun = true)
+ public void setTestMethodName(Method m) throws Exception {
+ methodName = m.getName();
+ }
+ @BeforeClass(alwaysRun = true)
+ public void setupExecutor() throws Exception {
executor = Executors.newFixedThreadPool(1, new
DefaultThreadFactory("PartitionedProducerConsumerTest"));
}
@AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
+ public void cleanupExecutor() throws Exception {
executor.shutdownNow();
}
@Test(timeOut = 30000)
public void testRoundRobinProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
int numPartitions = 4;
- TopicName topicName =
TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-"
- + System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").subscribe();
- assertEquals(consumer.getTopic(), topicName.toString());
+ assertEquals(consumer.getTopic(), topicName);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -110,7 +112,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -119,24 +120,23 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
public void testPartitionedTopicNameWithSpecialCharacter() throws
Exception {
log.info("-- Starting {} test --", methodName);
+ final String topicName = newTopicName();
int numPartitions = 4;
final String specialCharacter = "! * ' ( ) ; : @ & = + $ , \\ ? % # [
]";
- TopicName topicName =
TopicName.get("persistent://my-property/my-ns/my-partitionedtopic1-"
- + System.currentTimeMillis() + specialCharacter);
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ String topicWithSpecialChar = topicName + specialCharacter;
+ admin.topics().createPartitionedTopic(topicWithSpecialChar,
numPartitions);
// Try to create producer which does lookup and create connection with
broker
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicWithSpecialChar)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
producer.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 30000)
public void testCustomPartitionProducer() throws Exception {
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
- TopicName topicName = null;
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
Producer<byte[]> producer = null;
Consumer<byte[]> consumer = null;
final int messageCount = 16;
@@ -144,16 +144,14 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
int numPartitions = 4;
- topicName = TopicName
-
.get("persistent://my-property/my-ns/my-partitionedtopic1-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- producer = pulsarClient.newProducer().topic(topicName.toString())
+ producer = pulsarClient.newProducer().topic(topicName)
.messageRouter(new AlwaysTwoMessageRouter())
.create();
- consumer = pulsarClient.newConsumer().topic(topicName.toString())
+ consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").subscribe();
for (int i = 0; i < messageCount; i++) {
@@ -178,7 +176,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -187,18 +184,17 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testSinglePartitionProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic2-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").subscribe();
for (int i = 0; i < 10; i++) {
@@ -223,7 +219,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -231,18 +226,17 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testKeyBasedProducer() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic3-" +
System.currentTimeMillis());
String dummyKey1 = "dummykey1";
String dummyKey2 = "dummykey2";
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString()).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").subscribe();
for (int i = 0; i < 5; i++) {
@@ -269,7 +263,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -286,11 +279,10 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 100000)
public void testPauseAndResume() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
int numPartitions = 2;
- String topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic-pr-"
+ System.currentTimeMillis()).toString();
admin.topics().createPartitionedTopic(topicName, numPartitions);
@@ -341,14 +333,13 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testPauseAndResumeWithUnloading() throws Exception {
- final String topicName =
"persistent://my-property/my-ns/pause-and-resume-with-unloading"
- + System.currentTimeMillis();
+ final String topicName = newTopicName();
final String subName = "sub";
final int receiverQueueSize = 20;
final int numPartitions = 2;
final int numMessages = receiverQueueSize * numPartitions;
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ PulsarClient pulsarClient = newPulsarClient();
admin.topics().createPartitionedTopic(topicName, numPartitions);
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new
CountDownLatch(numMessages));
@@ -388,22 +379,20 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
producer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName, true);
}
@Test(timeOut = 30000)
public void testInvalidSequence() throws Exception {
log.info("-- Starting {} test --", methodName);
+ final String topicName = newTopicName();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic4-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").subscribe();
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@@ -442,17 +431,15 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
// ok
}
- admin.topics().deletePartitionedTopic(topicName.toString());
}
@Test(timeOut = 30000)
public void testSillyUser() throws Exception {
+ final String topicName = newTopicName();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic5-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
Producer<byte[]> producer = null;
Consumer<byte[]> consumer = null;
@@ -472,9 +459,9 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
}
try {
- producer =
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+ producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
- consumer =
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
+ consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
producer.send("message1".getBytes());
producer.send("message2".getBytes());
/* Message<byte[]> msg1 = */ consumer.receive();
@@ -486,27 +473,25 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.close();
}
- admin.topics().deletePartitionedTopic(topicName.toString());
}
@Test(timeOut = 30000)
public void testDeletePartitionedTopic() throws Exception {
+ final String topicName = newTopicName();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic6-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString()).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub")
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscribe();
consumer.unsubscribe();
consumer.close();
producer.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
+ admin.topics().deletePartitionedTopic(topicName);
- Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName.toString()).create();
+ Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName).create();
if (producer1 instanceof PartitionedProducerImpl) {
Assert.fail("should fail since partitioned topic was deleted");
}
@@ -515,21 +500,20 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testAsyncPartitionedProducerConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
+ final String topicName = newTopicName();
final int totalMsg = 100;
final Set<String> produceMsgs = new HashSet<>();
final Set<String> consumeMsgs = new HashSet<>();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic1-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe();
// produce messages
@@ -556,7 +540,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
producer.close();
consumer.unsubscribe();
consumer.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -564,23 +547,22 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws
Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
final int totalMsg = 100;
final Set<String> produceMsgs = new HashSet<>();
final Set<String> consumeMsgs = new HashSet<>();
int numPartitions = 4;
- TopicName topicName = TopicName
- .get("persistent://my-property/my-ns/my-partitionedtopic1-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString())
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe();
// produce messages
@@ -608,7 +590,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -621,10 +602,10 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testFairDistributionForPartitionConsumers() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
final int numPartitions = 2;
- final String topicName = "persistent://my-property/my-ns/my-topic-" +
System.currentTimeMillis();
final String producer1Msg = "producer1";
final String producer2Msg = "producer2";
final int queueSize = 10;
@@ -672,7 +653,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName);
log.info("-- Exiting {} test --", methodName);
}
@@ -707,8 +687,8 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test
public void testGetPartitionsForTopic() throws Exception {
+ final String topic = newTopicName();
int numPartitions = 4;
- String topic = "persistent://my-property/my-ns/my-partitionedtopic1-"
+ System.currentTimeMillis();
admin.topics().createPartitionedTopic(topic, numPartitions);
@@ -719,7 +699,7 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(pulsarClient.getPartitionsForTopic(topic).join(),
expectedPartitions);
- String nonPartitionedTopic =
"persistent://my-property/my-ns/my-non-partitionedtopic1";
+ String nonPartitionedTopic = newTopicName();
assertEquals(pulsarClient.getPartitionsForTopic(nonPartitionedTopic).join(),
Collections.singletonList(nonPartitionedTopic));
@@ -727,9 +707,9 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test
public void testMessageIdForSubscribeToSinglePartition() throws Exception {
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
- TopicName topicName = null;
- TopicName partition2TopicName = null;
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
+ String partition2TopicName =
TopicName.get(topicName).getPartition(2).toString();
Producer<byte[]> producer = null;
Consumer<byte[]> consumer1 = null;
Consumer<byte[]> consumer2 = null;
@@ -739,22 +719,19 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
try {
log.info("-- Starting {} test --", methodName);
- topicName =
TopicName.get("persistent://my-property/my-ns/my-topic-" +
System.currentTimeMillis());
- partition2TopicName = topicName.getPartition(2);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
-
- producer = pulsarClient.newProducer().topic(topicName.toString())
+ producer = pulsarClient.newProducer().topic(topicName)
.messageRouter(new AlwaysTwoMessageRouter())
.create();
- consumer1 = pulsarClient.newConsumer().topic(topicName.toString())
+ consumer1 = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("subscriber-partitioned")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
- consumer2 =
pulsarClient.newConsumer().topic(partition2TopicName.toString())
+ consumer2 = pulsarClient.newConsumer().topic(partition2TopicName)
.subscriptionName("subscriber-single")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
@@ -787,7 +764,6 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer2.unsubscribe();
consumer2.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
@@ -808,10 +784,10 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testAutoUpdatePartitionsForProducerConsumer() throws Exception
{
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
final int numPartitions = 2;
- final String topicName = "persistent://my-property/my-ns/my-topic-" +
System.currentTimeMillis();
final String producerMsg = "producerMsg";
final int totalMessages = 30;
@@ -897,38 +873,34 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages);
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName);
log.info("-- Exiting {} test --", methodName);
}
@Test
public void testCustomPartitionedProducer() throws Exception {
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
- TopicName topicName = null;
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
Producer<byte[]> producer = null;
try {
log.info("-- Starting {} test --", methodName);
int numPartitions = 4;
- topicName = TopicName
-
.get("persistent://my-property/my-ns/my-partitionedtopic1-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
RouterWithTopicName router = new RouterWithTopicName();
- producer = pulsarClient.newProducer().topic(topicName.toString())
+ producer = pulsarClient.newProducer().topic(topicName)
.messageRouter(router)
.create();
for (int i = 0; i < 1; i++) {
String message = "my-message-" + i;
producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
}
- assertEquals(router.topicName, topicName.toString());
+ assertEquals(router.topicName, topicName);
} finally {
producer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
}
@@ -942,18 +914,17 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
@Test
public void testPartitionedTopicInterceptor() throws Exception {
log.info("-- Starting {} test --", methodName);
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
// Creates new client connection
+ final String topicName = newTopicName();
+ PulsarClient pulsarClient = newPulsarClient();
int numPartitions = 4;
- TopicName topicName = TopicName
-
.get("persistent://my-property/my-ns/interceptor-partitionedtopic1-" +
System.currentTimeMillis());
- admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
AtomicInteger newProducerPartitions = new AtomicInteger(0);
AtomicInteger newConsumerPartitions = new AtomicInteger(0);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false)
.autoUpdatePartitions(true).autoUpdatePartitionsInterval(1,
TimeUnit.SECONDS)
.intercept(new ProducerInterceptor<byte[]>() {
@Override
@@ -976,7 +947,7 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
}
}).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-partitioned-subscriber").autoUpdatePartitionsInterval(1,
TimeUnit.SECONDS)
.intercept(new ConsumerInterceptor<byte[]>() {
@@ -1015,7 +986,7 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
}).subscribe();
int newPartitions = numPartitions + 5;
- admin.topics().updatePartitionedTopic(topicName.toString(),
newPartitions);
+ admin.topics().updatePartitionedTopic(topicName, newPartitions);
Awaitility.await().atMost(10000, TimeUnit.SECONDS).until(
() -> newProducerPartitions.get() == newPartitions &&
newConsumerPartitions.get() == newPartitions);
@@ -1029,11 +1000,20 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
consumer.unsubscribe();
consumer.close();
pulsarClient.close();
- admin.topics().deletePartitionedTopic(topicName.toString());
log.info("-- Exiting {} test --", methodName);
}
+ private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T
receivedMessage,
+ T expectedMessage) {
+ // Make sure that messages are received in order
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match the
expected message " + expectedMessage);
+
+ // Make sure that there are no duplicates
+ Assert.assertTrue(messagesReceived.add(receivedMessage), "Received
duplicate message " + receivedMessage);
+ }
+
private static class RouterWithTopicName implements MessageRouter {
static String topicName = null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
index 8e39660369b..bccae594768 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
@@ -38,6 +38,8 @@ public class PersistentTopicTerminateTest extends
SharedPulsarBaseTest {
public void testRecoverAfterTerminate() throws Exception {
final String topicName = newTopicName();
final String subscriptionName = "s1";
+ // Disable dedup so that the pulsar.dedup cursor doesn't prevent
ledger trimming.
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
@@ -64,7 +66,8 @@ public class PersistentTopicTerminateTest extends
SharedPulsarBaseTest {
assertEquals(msg2.getValue(), "2");
// Verify: the ledgers acked will be cleaned up.
- consumer.acknowledgeCumulative(msg2);
+ consumer.close();
+ admin.topics().skipAllMessages(topicName, subscriptionName);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
PersistentTopic persistentTopic =
(PersistentTopic) getTopic(topicName, false).join().get();
@@ -76,7 +79,6 @@ public class PersistentTopicTerminateTest extends
SharedPulsarBaseTest {
});
// Cleanup.
- consumer.close();
admin.topics().delete(topicName, false);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index af4786a987f..f37608f98d6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -30,45 +31,29 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
@Test(groups = "broker-api")
-public class RetryTopicTest extends ProducerConsumerBase {
+public class RetryTopicTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(RetryTopicTest.class);
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test
public void testRetryTopic() throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
@@ -85,9 +70,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -141,7 +126,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test
public void testRetryTopicWithProducerBuilder() throws Exception {
- final String topic =
"persistent://my-property/my-ns/retry-topic-with-producer-builder";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
final int sendMessages = 100;
@@ -166,7 +151,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
.topic(topic + "-" + subscriptionName + "-DLQ")
.subscriptionName(subscriptionNameDLQ)
@@ -227,7 +212,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
*/
@Test
public void testRetryTopicWithExclusiveMode() throws Exception {
- final String topic =
"persistent://my-property/my-ns/retry-topic-exclusive";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
@@ -279,7 +264,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 20000)
public void testAutoConsumeSchemaRetryLetter() throws Exception {
- final String topic =
"persistent://my-property/my-ns/retry-letter-topic";
+ final String topic = newTopicName();
final String subName = "my-subscription";
final String retrySubName = "my-subscription" + "-RETRY";
final int sendMessages = 10;
@@ -379,7 +364,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 60000)
public void testRetryTopicProperties() throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ final String topic = newTopicName();
byte[] key = "key".getBytes();
byte[] orderingKey = "orderingKey".getBytes();
@@ -400,9 +385,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -514,11 +499,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
//Issue 9327: do compatibility check in case of the default retry and dead
letter topic name changed
@Test
public void testRetryTopicNameForCompatibility () throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ final String topic = newTopicName();
- final String oldRetryTopic =
"persistent://my-property/my-ns/my-subscription-RETRY";
+ final String oldRetryTopic = "persistent://" + getNamespace() +
"/my-subscription-RETRY";
- final String oldDeadLetterTopic =
"persistent://my-property/my-ns/my-subscription-DLQ";
+ final String oldDeadLetterTopic = "persistent://" + getNamespace() +
"/my-subscription-DLQ";
final int maxRedeliveryCount = 2;
@@ -537,7 +522,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
.topic(oldDeadLetterTopic)
.subscriptionName("my-subscription")
@@ -598,28 +584,39 @@ public class RetryTopicTest extends ProducerConsumerBase {
*/
@Test
public void testRetryTopicWithMultiTopic() throws Exception {
- final String topic1 = "persistent://my-property/my-ns/retry-topic-1";
- final String topic2 = "persistent://my-property/my-ns/retry-topic-2";
+ final String topic1 = newTopicName();
+ final String topic2 = newTopicName();
final int maxRedeliveryCount = 2;
- int sendMessages = 100;
+ final int sendMessages = 100;
+ final int totalMessages = sendMessages * 2;
+ final String subscriptionName = "my-subscription";
+ // Use an explicit DLQ topic name since the auto-generated name
depends on the
+ // alphabetical ordering of the topics in the TreeSet, which is
unpredictable
+ // with random topic names.
+ final String dlqTopic = newTopicName() + "-DLQ";
// subscribe to the original topics before publish
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic1, topic2)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
-
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(maxRedeliveryCount)
+ .deadLetterTopic(dlqTopic)
+ .build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
- // subscribe to the DLQ topics before consuming original topics
+ // subscribe to the DLQ topic before consuming original topics.
+ @Cleanup
Consumer<byte[]> deadLetterConsumer =
pulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/retry-topic-1-my-subscription-DLQ")
- .subscriptionName("my-subscription")
+ .topic(dlqTopic)
+ .subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -636,34 +633,34 @@ public class RetryTopicTest extends ProducerConsumerBase {
producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
- sendMessages = sendMessages * 2;
-
producer1.close();
producer2.close();
int totalReceived = 0;
do {
- Message<byte[]> message = consumer.receive();
+ Message<byte[]> message = consumer.receive(30, TimeUnit.SECONDS);
+ assertNotNull(message, "Expected more messages, received " +
totalReceived);
log.info("consumer received message : {} {} - total = {}",
message.getMessageId(), new String(message.getData()),
++totalReceived);
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
- } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+ } while (totalReceived < totalMessages * (maxRedeliveryCount + 1));
int totalInDeadLetter = 0;
do {
- Message message = deadLetterConsumer.receive();
+ Message message = deadLetterConsumer.receive(30, TimeUnit.SECONDS);
+ assertNotNull(message, "Expected more DLQ messages, received " +
totalInDeadLetter);
log.info("dead letter consumer received message : {} {}",
message.getMessageId(),
new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
- } while (totalInDeadLetter < sendMessages);
+ } while (totalInDeadLetter < totalMessages);
- deadLetterConsumer.close();
consumer.close();
+ @Cleanup
Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic1, topic2)
- .subscriptionName("my-subscription")
+ .subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -674,16 +671,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
new String(checkMessage.getData()));
}
assertNull(checkMessage);
-
- checkConsumer.close();
}
@Test
public void testRetryTopicByCustomTopicName() throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
final int sendMessages = 100;
+ final String customRetryTopic = "persistent://" + getNamespace() +
"/my-subscription-custom-Retry";
+
// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
@@ -693,14 +690,14 @@ public class RetryTopicTest extends ProducerConsumerBase {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
-
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+ .retryLetterTopic(customRetryTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscribe();
@@ -730,7 +727,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
deadLetterConsumer.close();
consumer.close();
@Cleanup
- PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient1 = newPulsarClient();
Consumer<byte[]> checkConsumer =
newPulsarClient1.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
@@ -749,8 +746,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
- String retryLetterTopic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
- final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
+ final String topic = newTopicName();
+ String retryLetterTopic = topic + "-RETRY";
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
@@ -791,7 +788,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 30000L)
public void testRetryProducerWillCloseByConsumer() throws Exception {
- final String topicName = "persistent://my-property/my-ns/tp_" +
UUID.randomUUID().toString();
+ final String topicName = newTopicName();
final String subscriptionName = "sub1";
final String topicRetry = topicName + "-" + subscriptionName +
"-RETRY";
final String topicDLQ = topicName + "-" + subscriptionName + "-DLQ";
@@ -838,8 +835,8 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
- String retryLetterTopic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
- final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
+ final String topic = newTopicName();
+ String retryLetterTopic = topic + "-RETRY";
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
@@ -922,18 +919,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
// but for retry topic
@Test
public void
testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws
Exception {
- String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
- admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
- admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ admin.namespaces().setSchemaValidationEnforced(getNamespace(), false);
// set schema compatibility strategy to always compatible
- admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ admin.namespaces()
+ .setSchemaCompatibilityStrategy(getNamespace(),
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
PayloadIncompatible.class);
- String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
- +
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+ String topic = newTopicName();
String dlqTopic = topic + "-DLQ";
String retryTopic = topic + "-RETRY";
@@ -968,7 +963,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
Thread.sleep(2000L);
-
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+
assertThat(getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be
<= 1 so that it doesn't leak producers",
retryTopic)
.isLessThanOrEqualTo(1);
@@ -983,7 +978,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
}
}
-
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+ assertThat(getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be 0
here",
retryTopic)
.isEqualTo(0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6c133d6c109..6e872627fb6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -42,6 +43,8 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
@@ -55,26 +58,19 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "flaky")
-public class TopicReaderTest extends ProducerConsumerBase {
+public class TopicReaderTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(TopicReaderTest.class);
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
+ protected String methodName;
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
+ @BeforeMethod(alwaysRun = true)
+ public void setTestMethodName(Method m) {
+ methodName = m.getName();
}
@DataProvider
@@ -115,10 +111,11 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testSimpleReader() throws Exception {
- Reader<byte[]> reader =
pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader")
+ final String topicName = newTopicName();
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();
- Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader")
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -143,7 +140,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testSimpleMultiReader() throws Exception {
- String topic = "persistent://my-property/my-ns/testSimpleMultiReader";
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
@@ -170,8 +167,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testReaderAfterMessagesWerePublished() throws Exception {
+ final String topicName = newTopicName();
Producer<byte[]> producer =
-
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
+ pulsarClient.newProducer().topic(topicName)
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -179,7 +177,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
Reader<byte[]> reader = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished")
+ .topic(topicName)
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
@@ -200,7 +198,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMultiReaderAfterMessagesWerePublished() throws Exception {
- String topic =
"persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished";
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.create();
@@ -227,8 +225,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMultipleReaders() throws Exception {
+ final String topicName = newTopicName();
Producer<byte[]> producer =
-
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders")
+ pulsarClient.newProducer().topic(topicName)
.create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -236,11 +235,11 @@ public class TopicReaderTest extends ProducerConsumerBase
{
}
Reader<byte[]> reader1 =
-
pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
+ pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();
Reader<byte[]> reader2 =
-
pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders")
+ pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();
Message<byte[]> msg = null;
@@ -271,7 +270,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMultiMultipleReaders() throws Exception {
- final String topic =
"persistent://my-property/my-ns/testMultiMultipleReaders";
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.create();
@@ -309,7 +308,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testTopicStats() throws Exception {
- String topicName = "persistent://my-property/my-ns/testTopicStats";
+ String topicName = newTopicName();
Reader<byte[]> reader1 =
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
@@ -330,7 +329,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMultiTopicStats() throws Exception {
- String topicName =
"persistent://my-property/my-ns/testMultiTopicStats";
+ String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
Reader<byte[]> reader1 =
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
@@ -352,7 +351,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test(dataProvider = "variationsForResetOnLatestMsg")
public void testReaderOnLatestMessage(boolean startInclusive, int
numOfMessages) throws Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderOnLatestMessage";
+ final String topicName = newTopicName();
final int halfOfMsgs = numOfMessages / 2;
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -397,8 +396,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test(dataProvider = "variationsForResetOnLatestMsg")
public void testMultiReaderOnLatestMessage(boolean startInclusive, int
numOfMessages) throws Exception {
- final String topicName =
"persistent://my-property/my-ns/testMultiReaderOnLatestMessage"
- + System.currentTimeMillis();
+ final String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
final int halfOfMsgs = numOfMessages / 2;
@@ -446,8 +444,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testReaderOnSpecificMessage() throws Exception {
+ final String topicName = newTopicName();
Producer<byte[]> producer =
-
pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
+ pulsarClient.newProducer().topic(topicName)
.create();
List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
@@ -456,7 +455,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
Reader<byte[]> reader =
-
pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage")
+ pulsarClient.newReader().topic(topicName)
.startMessageId(messageIds.get(4)).create();
// Publish more messages and verify the readers only sees messages
starting from the intended message
@@ -478,8 +477,9 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testReaderOnSpecificMessageWithBatches() throws Exception {
+ final String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer()
-
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+ .topic(topicName)
.enableBatching(true).batchingMaxPublishDelay(100,
TimeUnit.MILLISECONDS).create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -489,7 +489,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
// Write one sync message to ensure everything before got persistend
producer.send("my-message-10".getBytes());
Reader<byte[]> reader1 = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+ .topic(topicName)
.startMessageId(MessageId.earliest).create();
MessageId lastMessageId = null;
@@ -503,7 +503,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
System.out.println("CREATING READER ON MSG ID: " + lastMessageId);
Reader<byte[]> reader2 = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches")
+ .topic(topicName)
.startMessageId(lastMessageId).create();
for (int i = 5; i < 11; i++) {
@@ -561,13 +561,14 @@ public class TopicReaderTest extends ProducerConsumerBase
{
final int totalMsg = 10;
+ final String topicName = newTopicName();
Set<String> messageSet = new HashSet<>();
Reader<byte[]> reader = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest)
+ .topic(topicName).startMessageId(MessageId.latest)
.cryptoKeyReader(new EncKeyReader()).create();
Producer<byte[]> producer = pulsarClient.newProducer()
-
.topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1")
+ .topic(topicName)
.addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new
EncKeyReader()).create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
@@ -632,7 +633,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
final int totalMsg = 10;
Set<String> messageSet = new HashSet<>();
- String topic =
"persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1";
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic).startMessageId(MessageId.latest)
@@ -659,8 +660,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testDefaultCryptoKeyReader() throws Exception {
- final String topic =
"persistent://my-property/my-ns/test-reader-default-crypto-key-reader"
- + System.currentTimeMillis();
+ final String topic = newTopicName();
final String ecdsaPublicKeyFile =
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile =
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
final String ecdsaPublicKeyData =
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlI"
@@ -789,11 +789,12 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testSimpleReaderReachEndOfTopic() throws Exception {
+ final String topicName = newTopicName();
Reader<byte[]> reader = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
+ .topic(topicName)
.startMessageId(MessageId.earliest).create();
Producer<byte[]> producer = pulsarClient.newProducer()
-
.topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
+ .topic(topicName)
.create();
// no data write, should return false
@@ -847,7 +848,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testSimpleMultiReaderReachEndOfTopic() throws Exception {
- String topic =
"persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic";
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Reader<byte[]> reader =
pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest).create();
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
@@ -901,12 +902,13 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderReachEndOfTopicOnMessageWithBatches() throws
Exception {
+ final String topicName = newTopicName();
Reader<byte[]> reader = pulsarClient.newReader()
-
.topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+ .topic(topicName)
.startMessageId(MessageId.earliest).create();
Producer<byte[]> producer = pulsarClient.newProducer()
-
.topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
+ .topic(topicName)
.enableBatching(true).batchingMaxPublishDelay(100,
TimeUnit.MILLISECONDS).create();
// no data write, should return false
@@ -944,7 +946,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMultiReaderReachEndOfTopicOnMessageWithBatches() throws
Exception {
- String topic =
"persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches";
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
@@ -989,7 +991,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
@Test
public void testMessageAvailableAfterRestart() throws Exception {
- String topic =
"persistent://my-property/my-ns/testMessageAvailableAfterRestart";
+ String topic = newTopicName();
String content = "my-message-1";
// stop retention from cleaning up
@@ -1010,7 +1012,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
}
// cause broker to drop topic. Will be loaded next time we access it
-
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
+ getTopicReference(topic).get().close(false).get();
try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
@@ -1025,7 +1027,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testMultiReaderMessageAvailableAfterRestart() throws Exception
{
- String topic =
"persistent://my-property/my-ns/testMessageAvailableAfterRestart2";
+ String topic = newTopicName();
String content = "my-message-1";
admin.topics().createPartitionedTopic(topic, 3);
// stop retention from cleaning up
@@ -1046,9 +1048,9 @@ public class TopicReaderTest extends ProducerConsumerBase
{
}
// cause broker to drop topic. Will be loaded next time we access it
- pulsar.getBrokerService().getTopics().keySet().forEach(topicName -> {
+
SharedPulsarCluster.get().getPulsarService().getBrokerService().getTopics().keySet().forEach(topicName
-> {
try {
-
pulsar.getBrokerService().getTopicReference(topicName).get().close(false).get();
+ getTopicReference(topicName).get().close(false).get();
} catch (Exception e) {
fail();
}
@@ -1067,7 +1069,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test(dataProvider = "variationsForHasMessageAvailable")
public void testHasMessageAvailable(boolean enableBatch, boolean
startInclusive) throws Exception {
- final String topicName =
"persistent://my-property/my-ns/HasMessageAvailable";
+ final String topicName = newTopicName();
final int numOfMessage = 100;
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
@@ -1128,7 +1130,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test(timeOut = 20000)
public void testHasMessageAvailable() throws Exception {
- final String topicName =
"persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
+ final String topicName = newTopicName();
final int numOfMessage = 10;
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1205,7 +1207,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws
Exception {
final int numOfMessage = 10;
- final String topicName =
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
+ final String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();
@@ -1229,7 +1231,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testMultiReaderNonDurableIsAbleToSeekRelativeTime() throws
Exception {
final int numOfMessage = 10;
- final String topicName =
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
+ final String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
@@ -1251,7 +1253,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws
Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+ final String topicName = newTopicName();
final int numOfMessage = 10;
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1300,7 +1302,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testMultiReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws
Exception {
- final String topicName =
"persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic";
+ final String topicName = newTopicName();
final int numOfMessage = 10;
admin.topics().createPartitionedTopic(topicName, 3);
@@ -1346,7 +1348,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws
Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
+ final String topicName = newTopicName();
final int numOfMessage = 100;
final int halfMessages = numOfMessage / 2;
@@ -1401,7 +1403,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws
Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
+ final String topicName = newTopicName();
final int numOfMessage = 10;
final int halfMessages = numOfMessage / 2;
@@ -1434,8 +1436,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws
Exception {
- final String topicName =
"persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic"
- + System.currentTimeMillis();
+ final String topicName = newTopicName();
final int numOfMessage = 10;
final int halfMessages = numOfMessage / 2;
admin.topics().createPartitionedTopic(topicName, 3);
@@ -1464,7 +1465,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test(dataProvider = "variationsForExpectedPos")
public void testReaderStartMessageIdAtExpectedPos(boolean batching,
boolean startInclusive, int numOfMessages)
throws Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos";
+ final String topicName = newTopicName();
final int resetIndex = new Random().nextInt(numOfMessages); // Choose
some random index to reset
final int firstMessage = startInclusive ? resetIndex : resetIndex + 1;
// First message of reset
@@ -1526,7 +1527,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderBuilderConcurrentCreate() throws Exception {
- String topicName =
"persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_";
+ String topicName = newTopicName() + "-";
int numTopic = 30;
ReaderBuilder<byte[]> builder =
pulsarClient.newReader().startMessageId(MessageId.earliest);
@@ -1554,7 +1555,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test(timeOut = 10000)
public void testMultiReaderBuilderConcurrentCreate() throws Exception {
- String topicName =
"persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_";
+ String topicName = newTopicName() + "-";
int numTopic = 30;
ReaderBuilder<byte[]> builder =
pulsarClient.newReader().startMessageId(MessageId.earliest);
@@ -1583,7 +1584,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
@Test
public void testReaderStartInMiddleOfBatch() throws Exception {
- final String topicName =
"persistent://my-property/my-ns/ReaderStartInMiddleOfBatch";
+ final String topicName = newTopicName();
final int numOfMessage = 100;
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -1674,4 +1675,11 @@ public class TopicReaderTest extends
ProducerConsumerBase {
assertTrue(r2Inclusive.hasMessageAvailable());
assertTrue(r3.hasMessageAvailable());
}
+
+ private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T
receivedMessage,
+ T expectedMessage) {
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match the
expected message " + expectedMessage);
+ Assert.assertTrue(messagesReceived.add(receivedMessage), "Received
duplicate message " + receivedMessage);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
index fa8b3a7fcf0..ae321c4b2be 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
@@ -28,43 +28,25 @@ import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class ConsumerDecryptFailListenerTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerDecryptFailListenerTest extends SharedPulsarBaseTest {
@Test(timeOut = 10000)
public void testDecryptFailListenerException() {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://my-property/my-ns/testReaderBuildExceptionsWithSetReaderDecryptFailure"
- );
+ final String topic = newTopicName();
// should throw exception if decryptFailListener is set without
setting a messageListener
assertThatThrownBy(
() -> pulsarClient.newConsumer().topic(topic)
@@ -93,9 +75,7 @@ public class ConsumerDecryptFailListenerTest extends
ProducerConsumerBase {
@Test(timeOut = 20000)
public void testDecryptFailListenerBehaviorWithConsumerImpl() throws
Exception {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
- );
+ final String topic = newTopicName();
admin.topics().createNonPartitionedTopic(topic);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topic)
.decryptFailListener(((reader, msg) -> {
@@ -125,9 +105,7 @@ public class ConsumerDecryptFailListenerTest extends
ProducerConsumerBase {
@Test(timeOut = 20000)
public void testDecryptFailListenerBehaviorWithMultiConsumerImpl() throws
Exception {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithMultiConsumerImpl"
- );
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
MultiTopicsConsumerImpl<byte[]> consumer1 =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topic)
@@ -159,9 +137,7 @@ public class ConsumerDecryptFailListenerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testDecryptFailListenerReceiveMessage() throws Exception {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://my-property/my-ns/testDecryptFailListenerReceiveMessage"
- );
+ final String topic = newTopicName();
admin.topics().createNonPartitionedTopic(topic);
int totalMessages = 10;
CountDownLatch countDownLatch = new CountDownLatch(10);
@@ -199,9 +175,7 @@ public class ConsumerDecryptFailListenerTest extends
ProducerConsumerBase {
*/
@Test(timeOut = 30000)
public void testBothDecryptFailListenerAndMessageListenerReceiveMessage()
throws Exception {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://my-property/my-ns/testBothDecryptFailListenerAndMessageListenerReceiveMessage"
- );
+ final String topic = newTopicName();
int totalMessages = 10;
CountDownLatch decryptSuccessCount = new CountDownLatch(5);
CountDownLatch decryptFailCount = new CountDownLatch(5);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 203715ca7db..ff31409a241 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
@@ -43,39 +44,23 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-impl")
-public class MessageChunkingSharedTest extends ProducerConsumerBase {
+public class MessageChunkingSharedTest extends SharedPulsarBaseTest {
private static final int MAX_MESSAGE_SIZE = 100;
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test
public void testSingleConsumer() throws Exception {
- final String topic = "my-property/my-ns/test-single-consumer";
+ final String topic = newTopicName();
@Cleanup final Producer<String> producer = createProducer(topic);
@Cleanup final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -108,7 +93,7 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
@Test
public void testMultiConsumers() throws Exception {
- final String topic = "my-property/my-ns/test-multi-consumers";
+ final String topic = newTopicName();
@Cleanup final Producer<String> producer = createProducer(topic);
final ConsumerBuilder<String> consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -148,7 +133,7 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
@Test
public void testInterleavedChunks() throws Exception {
- final String topic =
"persistent://my-property/my-ns/test-interleaved-chunks";
+ final String topic = newTopicName();
final ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
@@ -158,8 +143,7 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
(consumer, msg) ->
receivedUuidList1.add(msg.getProducerName() + "-" + msg.getSequenceId()))
.consumerName("consumer-1")
.subscribe();
- final PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
- .getTopicIfExists(topic).get().orElse(null);
+ final PersistentTopic persistentTopic = (PersistentTopic)
getTopicIfExists(topic).get().orElse(null);
assertNotNull(persistentTopic);
// send: A-0, A-1-0-3, A-1-1-3, B-0, B-1-0-2
sendNonChunk(persistentTopic, "A", 0);
@@ -196,7 +180,7 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
// Issue #25220
@Test
public void testNegativeAckChunkedMessage() throws Exception {
- final String topic =
"persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
+ final String topic = newTopicName();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
index a458d710697..5eb5cc050b0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
@@ -25,36 +25,21 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.TopicMetadata;
import
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-impl")
-public class PartialPartitionedProducerTest extends ProducerConsumerBase {
- @Override
- @BeforeClass
- public void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class PartialPartitionedProducerTest extends SharedPulsarBaseTest {
@Test
public void testPtWithSinglePartition() throws Throwable {
- final String topic =
BrokerTestUtil.newUniqueName("pt-with-single-routing");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
@@ -74,7 +59,7 @@ public class PartialPartitionedProducerTest extends
ProducerConsumerBase {
@Test
public void testPtWithPartialPartition() throws Throwable {
- final String topic =
BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
@@ -96,7 +81,7 @@ public class PartialPartitionedProducerTest extends
ProducerConsumerBase {
// AddPartitionTest
@Test
public void testPtLazyLoading() throws Throwable {
- final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
@@ -128,7 +113,7 @@ public class PartialPartitionedProducerTest extends
ProducerConsumerBase {
@Test
public void testPtLoadingNotSharedMode() throws Throwable {
- final String topic =
BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
@@ -162,7 +147,7 @@ public class PartialPartitionedProducerTest extends
ProducerConsumerBase {
// AddPartitionAndLimitTest
@Test
public void testPtUpdateWithPartialPartition() throws Throwable {
- final String topic =
BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 2);
final Field field =
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
@@ -214,7 +199,7 @@ public class PartialPartitionedProducerTest extends
ProducerConsumerBase {
@Test
public void testPtUpdateNotSharedMode() throws Throwable {
- final String topic =
BrokerTestUtil.newUniqueName("pt-update-not-shared");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 2);
final Field field =
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
index e18dc372f8c..205211bff73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java
@@ -31,12 +31,11 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
@@ -46,36 +45,17 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.MockedStatic;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class ProducerMemoryLeakTest extends ProducerConsumerBase {
+public class ProducerMemoryLeakTest extends SharedPulsarBaseTest {
- private static final String NAMESPACE_NEVER_COMPATIBLE =
"public/schema-never-compatible";
-
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE);
-
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE,
- SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
@Test
public void testSendQueueIsFull() throws Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING)
.blockIfQueueFull(false).maxPendingMessages(1)
@@ -126,7 +106,7 @@ public class ProducerMemoryLeakTest extends
ProducerConsumerBase {
@Test(dataProvider = "maxMessageSizeAndCompressions")
public void testSendMessageSizeExceeded(int maxMessageSize,
CompressionType compressionType) throws Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
.compressionType(compressionType)
@@ -205,7 +185,7 @@ public class ProducerMemoryLeakTest extends
ProducerConsumerBase {
@Test(dataProvider = "maxMessageSizes")
public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws
Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
.enableBatching(true)
@@ -249,7 +229,7 @@ public class ProducerMemoryLeakTest extends
ProducerConsumerBase {
@Test
public void testSendAfterClosedProducer() throws Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<String> producer =
(ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
@@ -278,8 +258,9 @@ public class ProducerMemoryLeakTest extends
ProducerConsumerBase {
@Test
public void testBrokenSchema() throws Exception {
- final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ NAMESPACE_NEVER_COMPATIBLE
- + "/tp");
+ admin.namespaces().setSchemaCompatibilityStrategy(getNamespace(),
+ SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl producer =
(ProducerImpl)
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
@@ -332,7 +313,7 @@ public class ProducerMemoryLeakTest extends
ProducerConsumerBase {
@Test(dataProvider = "failedInterceptAt")
public void testInterceptorError(String method) throws Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
+ final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
ProducerImpl<String> producer = (ProducerImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topicName)
.intercept(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
index 17c4271b873..17fab63189d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarMultiHostClientTest.java
@@ -21,56 +21,50 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class PulsarMultiHostClientTest extends ProducerConsumerBase {
+public class PulsarMultiHostClientTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(PulsarMultiHostClientTest.class);
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
+ protected String methodName;
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
+ @BeforeMethod(alwaysRun = true)
+ public void setTestMethodName(Method m) {
+ methodName = m.getName();
}
@Test
public void testGetPartitionedTopicMetaData() {
log.info("-- Starting {} test --", methodName);
- final String topicName = "persistent://my-property/my-ns/my-topic1";
+ final String topicName = newTopicName();
final String subscriptionName = "my-subscriber-name";
try {
- String url = pulsar.getWebServiceAddress();
- if (isTcpLookup) {
- url = pulsar.getBrokerServiceUrl();
- }
@Cleanup
- PulsarClient client = newPulsarClient(url, 0);
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getWebServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
Consumer<byte[]> consumer =
client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -90,7 +84,7 @@ public class PulsarMultiHostClientTest extends
ProducerConsumerBase {
public void testGetPartitionedTopicDataTimeout() {
log.info("-- Starting {} test --", methodName);
- final String topicName = "persistent://my-property/my-ns/my-topic1";
+ final String topicName = newTopicName();
String url = "http://localhost:" + getFreePort() + ",localhost:" +
getFreePort();
@@ -124,17 +118,17 @@ public class PulsarMultiHostClientTest extends
ProducerConsumerBase {
public void testMultiHostUrlRetrySuccess() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String topicName = "persistent://my-property/my-ns/my-topic1";
+ final String topicName = newTopicName();
final String subscriptionName = "my-subscriber-name";
// Multi hosts included an unreached port and the actual port for
verify retry logic
String urlsWithUnreached = "http://localhost:51000,localhost:"
- + new URI(pulsar.getWebServiceAddress()).getPort();
- if (isTcpLookup) {
- urlsWithUnreached = "pulsar://localhost:51000,localhost" + new
URI(pulsar.getBrokerServiceUrl()).getPort();
- }
+ + new URI(getWebServiceUrl()).getPort();
@Cleanup
- PulsarClient client = newPulsarClient(urlsWithUnreached, 0);
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(urlsWithUnreached)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
Consumer<byte[]> consumer =
client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
@@ -153,7 +147,8 @@ public class PulsarMultiHostClientTest extends
ProducerConsumerBase {
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
- testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ Assert.assertEquals(receivedMessage, expectedMessage);
+ Assert.assertTrue(messageSet.add(receivedMessage), "Duplicate
message: " + receivedMessage);
}
// Acknowledge the consumption of all messages at once