This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0163ce8677b [cleanup] Convert 12 test classes to SharedPulsarBaseTest
(#25392)
0163ce8677b is described below
commit 0163ce8677bbb72ec959052d55d0ce43918ca469
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Mar 24 08:05:06 2026 -0700
[cleanup] Convert 12 test classes to SharedPulsarBaseTest (#25392)
---
.../broker/admin/MaxUnackedMessagesTest.java | 213 ++++++++++-----------
.../RGUsageMTAggrWaitForAllMsgsTest.java | 44 ++---
.../ResourceGroupUsageAggregationTest.java | 44 ++---
.../broker/service/schema/ClientGetSchemaTest.java | 149 +++++---------
.../stats/BookieClientsStatsGeneratorTest.java | 22 +--
.../pulsar/client/api/ConsumerRedeliveryTest.java | 185 ++++++++----------
.../pulsar/client/api/DeadLetterTopicTest.java | 148 +++++++-------
.../client/api/PatternMultiTopicsConsumerTest.java | 40 ++--
.../pulsar/client/api/UnloadSubscriptionTest.java | 134 ++++++-------
.../apache/pulsar/client/impl/MessageIdTest.java | 22 +--
.../PerMessageUnAcknowledgedRedeliveryTest.java | 28 +--
.../pulsar/client/impl/ProducerCloseTest.java | 58 ++----
.../pulsar/client/impl/TopicDoesNotExistsTest.java | 50 ++---
.../processor/MessagePayloadProcessorTest.java | 36 +---
14 files changed, 479 insertions(+), 694 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
index 9a8e8fa5a50..7a67fb22198 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -27,50 +27,29 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
-public class MaxUnackedMessagesTest extends ProducerConsumerBase {
- private final String testTenant = "my-property";
- private final String testNamespace = "my-ns";
- private final String myNamespace = testTenant + "/" + testNamespace;
- private final String testTopic = "persistent://" + myNamespace +
"/max-unacked-";
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class MaxUnackedMessagesTest extends SharedPulsarBaseTest {
@Test(timeOut = 10000)
public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
- final String topicName = testTopic + UUID.randomUUID().toString();
+ final String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max =
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
@@ -89,97 +68,99 @@ public class MaxUnackedMessagesTest extends
ProducerConsumerBase {
// See https://github.com/apache/pulsar/issues/5438
@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnSubscription() throws Exception {
- final String topicName = testTopic + System.currentTimeMillis();
- final String subscriberName = "test-sub" + System.currentTimeMillis();
+ final String topicName = newTopicName();
+ final String subscriberName = "test-sub";
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 200;
-
pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
- ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName)
-
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
- .subscriptionType(SubscriptionType.Shared);
- Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
- Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
- Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
- List<Consumer<?>> consumers = List.of(consumer1, consumer2, consumer3);
- waitCacheInit(topicName);
- admin.topics().setMaxUnackedMessagesOnSubscription(topicName,
unackMsgAllowed);
- Awaitility.await().untilAsserted(()
- ->
assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ int origValue = getConfig().getMaxUnackedMessagesPerSubscription();
+ getConfig().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
+ try {
+ ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName)
+
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
+ .subscriptionType(SubscriptionType.Shared);
+ Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+ Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+ Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+ List<Consumer<?>> consumers = List.of(consumer1, consumer2,
consumer3);
+ waitCacheInit(topicName);
+ admin.topics().setMaxUnackedMessagesOnSubscription(topicName,
unackMsgAllowed);
+ Awaitility.await().untilAsserted(()
+ ->
assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
- // (1) Produced Messages
- for (int i = 0; i < totalProducedMsgs; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
- }
+ // (1) Produced Messages
+ for (int i = 0; i < totalProducedMsgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
- // (2) try to consume messages: but will be able to consume number of
messages = unackMsgAllowed
- Message<?> msg = null;
- Map<Message<?>, Consumer<?>> messages = new HashMap<>();
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < totalProducedMsgs; j++) {
- msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
- if (msg != null) {
- messages.put(msg, consumers.get(i));
- } else {
- break;
+ // (2) try to consume messages: but will be able to consume number
of messages = unackMsgAllowed
+ Message<?> msg = null;
+ Map<Message<?>, Consumer<?>> messages = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < totalProducedMsgs; j++) {
+ msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
+ if (msg != null) {
+ messages.put(msg, consumers.get(i));
+ } else {
+ break;
+ }
}
}
- }
- // client must receive number of messages = unAckedMessagesBufferSize
rather all produced messages: check
- // delta as 3 consumers with receiverQueueSize = 10
- assertEquals(unackMsgAllowed, messages.size(), receiverQueueSize * 3);
+ // client must receive number of messages =
unAckedMessagesBufferSize rather all produced messages
+ assertEquals(unackMsgAllowed, messages.size(), receiverQueueSize *
3);
- // start acknowledging messages
- messages.forEach((m, c) -> {
- try {
- c.acknowledge(m);
- } catch (PulsarClientException e) {
- fail("ack failed", e);
- }
- });
+ // start acknowledging messages
+ messages.forEach((m, c) -> {
+ try {
+ c.acknowledge(m);
+ } catch (PulsarClientException e) {
+ fail("ack failed", e);
+ }
+ });
- // try to consume remaining messages: broker may take time to deliver
so, retry multiple time to consume
- // all messages
- Set<MessageId> result = ConcurrentHashMap.newKeySet();
- // expecting messages which are not received
- int expectedRemainingMessages = totalProducedMsgs - messages.size();
- CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
- for (int i = 0; i < consumers.size(); i++) {
- final int consumerCount = i;
- for (int j = 0; j < totalProducedMsgs; j++) {
- consumers.get(i).receiveAsync().thenAccept(m -> {
- result.add(m.getMessageId());
- try {
- consumers.get(consumerCount).acknowledge(m);
- } catch (PulsarClientException e) {
- fail("failed to ack msg", e);
- }
- latch.countDown();
- });
+ // try to consume remaining messages
+ Set<MessageId> result = ConcurrentHashMap.newKeySet();
+ int expectedRemainingMessages = totalProducedMsgs -
messages.size();
+ CountDownLatch latch = new
CountDownLatch(expectedRemainingMessages);
+ for (int i = 0; i < consumers.size(); i++) {
+ final int consumerCount = i;
+ for (int j = 0; j < totalProducedMsgs; j++) {
+ consumers.get(i).receiveAsync().thenAccept(m -> {
+ result.add(m.getMessageId());
+ try {
+ consumers.get(consumerCount).acknowledge(m);
+ } catch (PulsarClientException e) {
+ fail("failed to ack msg", e);
+ }
+ latch.countDown();
+ });
+ }
}
- }
- latch.await(10, TimeUnit.SECONDS);
+ latch.await(10, TimeUnit.SECONDS);
- // total received-messages should match to produced messages (it may
have duplicate messages)
- assertEquals(result.size(), expectedRemainingMessages);
+ // total received-messages should match to produced messages (it
may have duplicate messages)
+ assertEquals(result.size(), expectedRemainingMessages);
- producer.close();
- consumers.forEach(c -> {
- try {
- c.close();
- } catch (PulsarClientException e) {
- }
- });
+ producer.close();
+ consumers.forEach(c -> {
+ try {
+ c.close();
+ } catch (PulsarClientException e) {
+ }
+ });
+ } finally {
+ getConfig().setMaxUnackedMessagesPerSubscription(origValue);
+ }
}
@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
- final String topicName = testTopic + UUID.randomUUID().toString();
+ final String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max =
admin.topics().getMaxUnackedMessagesOnConsumer(topicName);
@@ -197,22 +178,22 @@ public class MaxUnackedMessagesTest extends
ProducerConsumerBase {
@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnConsumerAppliedApi() throws Exception {
- final String topicName = testTopic + UUID.randomUUID().toString();
+ final String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max =
admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
- assertEquals(max.intValue(),
pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer());
+ assertEquals(max.intValue(),
getConfig().getMaxUnackedMessagesPerConsumer());
- admin.namespaces().setMaxUnackedMessagesPerConsumer(myNamespace, 15);
+ admin.namespaces().setMaxUnackedMessagesPerConsumer(getNamespace(),
15);
Awaitility.await().untilAsserted(()
- ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace).intValue(),
15));
- admin.namespaces().removeMaxUnackedMessagesPerConsumer(myNamespace);
+ ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace()).intValue(),
15));
+ admin.namespaces().removeMaxUnackedMessagesPerConsumer(getNamespace());
Awaitility.await().untilAsserted(()
- ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace),
null));
+ ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace()),
null));
- admin.namespaces().setMaxUnackedMessagesPerConsumer(myNamespace, 10);
+ admin.namespaces().setMaxUnackedMessagesPerConsumer(getNamespace(),
10);
Awaitility.await().untilAsserted(()
- ->
assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(myNamespace)));
+ ->
assertNotNull(admin.namespaces().getMaxUnackedMessagesPerConsumer(getNamespace())));
max = admin.topics().getMaxUnackedMessagesOnConsumer(topicName, true);
assertEquals(max.intValue(), 10);
@@ -225,36 +206,37 @@ public class MaxUnackedMessagesTest extends
ProducerConsumerBase {
@Test
public void testMaxUnackedMessagesOnSubApplied() throws Exception {
- final String topicName = testTopic + UUID.randomUUID().toString();
+ final String topicName = newTopicName();
waitCacheInit(topicName);
-
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace));
+
assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()));
assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName,
true),
- Integer.valueOf(conf.getMaxUnackedMessagesPerSubscription()));
+
Integer.valueOf(getConfig().getMaxUnackedMessagesPerSubscription()));
- admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace,
10);
+
admin.namespaces().setMaxUnackedMessagesPerSubscription(getNamespace(), 10);
Awaitility.await().untilAsserted(()
- ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace),
+ ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()),
Integer.valueOf(10)));
admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 20);
Awaitility.await().untilAsserted(()
- ->
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName),
Integer.valueOf(20)));
+ ->
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName),
+ Integer.valueOf(20)));
admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
Awaitility.await().untilAsserted(()
- ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace),
+ ->
assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(getNamespace()),
Integer.valueOf(10)));
-
admin.namespaces().removeMaxUnackedMessagesPerSubscription(myNamespace);
+
admin.namespaces().removeMaxUnackedMessagesPerSubscription(getNamespace());
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName,
true),
- Integer.valueOf(conf.getMaxUnackedMessagesPerSubscription()));
+
Integer.valueOf(getConfig().getMaxUnackedMessagesPerSubscription()));
}
@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnConsumer() throws Exception {
- final String topicName = testTopic + System.currentTimeMillis();
- final String subscriberName = "test-sub" + System.currentTimeMillis();
+ final String topicName = newTopicName();
+ final String subscriberName = "test-sub";
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;
@@ -335,6 +317,5 @@ public class MaxUnackedMessagesTest extends
ProducerConsumerBase {
private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
- TopicName topic = TopicName.get(topicName);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 2b45d955926..094a5591831 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -25,28 +25,26 @@ import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -57,11 +55,10 @@ import org.testng.annotations.Test;
// are verified on the RGs.
@Slf4j
@Test(groups = "flaky")
-public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
+public class RGUsageMTAggrWaitForAllMsgsTest extends SharedPulsarBaseTest {
+ @org.testng.annotations.BeforeClass(alwaysRun = true)
+ public void setupRG() throws Exception {
+ PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
this.prepareForOps();
ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator()
{
@@ -87,12 +84,6 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
Thread.sleep(2000);
}
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test
public void testMTProduceConsumeRGUsagePersistentTopicNamesSameTenant()
throws Exception {
testProduceConsumeUsageOnRG(persistentTopicNamesSameTenantAndNsRGs);
@@ -547,7 +538,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
int scaleFactor, boolean checkProduce,
boolean checkConsume) throws Exception {
- BrokerService bs = pulsar.getBrokerService();
+ BrokerService bs =
SharedPulsarCluster.get().getPulsarService().getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
log.debug("verifyProdConsStats: topicStatsMap has {} entries",
topicStatsMap.size());
@@ -779,7 +770,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
new org.apache.pulsar.common.policies.data.ResourceGroup();
private ResourceGroupService rgservice;
- private final String clusterName = "test";
+ private final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
private static final String BaseRGName = "rg-";
private static final String BaseTestTopicName = "rgusage-topic-";
@@ -837,8 +828,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
long residualRecvdNumMessages;
// Create the topics provided
- private void createTopics(String[] topics) {
- BrokerService bs = this.pulsar.getBrokerService();
+ private void createTopics(String[] topics) throws Exception {
+ BrokerService bs =
SharedPulsarCluster.get().getPulsarService().getBrokerService();
for (String topic : topics) {
if (!createdTopics.contains(topic)) {
bs.getOrCreateTopic(topic);
@@ -848,8 +839,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
}
// Destroy the topics provided
- private void destroyTopics(String[] topics) {
- BrokerService bs = this.pulsar.getBrokerService();
+ private void destroyTopics(String[] topics) throws Exception {
+ BrokerService bs =
SharedPulsarCluster.get().getPulsarService().getBrokerService();
for (String topic : topics) {
if (!createdTopics.contains(topic)) {
bs.deleteTopic(topic, true);
@@ -873,10 +864,11 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
}
// Initial set up for transport manager and cluster creation.
- private void prepareForOps() throws PulsarAdminException {
-
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
- this.conf.setAllowAutoTopicCreation(true);
- admin.clusters().createCluster(clusterName,
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+ private void prepareForOps() throws Exception {
+ SharedPulsarCluster.get().getPulsarService().getConfiguration()
+
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
+ // Cluster already created by SharedPulsarCluster
}
// Set up of RG/tenant/namespaces/topic names, and checking of the test
parameters.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
index ef20ac9d8f0..9c5aa8d7487 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java
@@ -23,37 +23,34 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class ResourceGroupUsageAggregationTest extends ProducerConsumerBase {
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
+public class ResourceGroupUsageAggregationTest extends SharedPulsarBaseTest {
+ @org.testng.annotations.BeforeClass(alwaysRun = true)
+ public void setupRG() throws Exception {
+ PulsarService pulsar = SharedPulsarCluster.get().getPulsarService();
this.prepareData();
ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator()
{
@@ -74,12 +71,6 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
this.rgs = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS,
transportMgr, dummyQuotaCalc);
}
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test
public void testProduceConsumeUsageOnRG() throws Exception {
testProduceConsumeUsageOnRG(produceConsumePersistentTopic);
@@ -183,7 +174,8 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
consumer.close();
// cleanup the topic data.
- CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopics().remove(topicString);
+ CompletableFuture<Optional<Topic>> topicFuture =
+
SharedPulsarCluster.get().getPulsarService().getBrokerService().getTopics().remove(topicString);
if (topicFuture != null) {
Optional<Topic> optTopic = topicFuture.join();
if (optTopic.isPresent()) {
@@ -210,8 +202,8 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
int sentNumBytes, int sentNumMsgs,
int recvdNumBytes, int recvdNumMsgs,
boolean checkProduce, boolean checkConsume)
- throws
InterruptedException, PulsarAdminException {
- BrokerService bs = pulsar.getBrokerService();
+ throws
Exception {
+ BrokerService bs =
SharedPulsarCluster.get().getPulsarService().getBrokerService();
Awaitility.await().untilAsserted(() -> {
TopicStatsImpl topicStats = bs.getTopicStats().get(topicString);
Assert.assertNotNull(topicStats);
@@ -272,15 +264,15 @@ public class ResourceGroupUsageAggregationTest extends
ProducerConsumerBase {
private static final int PUBLISH_INTERVAL_SECS = 300;
// Initial set up for transport manager and producer/consumer
clusters/tenants/namespaces/topics.
- private void prepareData() throws PulsarAdminException {
-
this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+ private void prepareData() throws Exception {
+ SharedPulsarCluster.get().getPulsarService().getConfiguration()
+
.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
- this.conf.setAllowAutoTopicCreation(true);
+
SharedPulsarCluster.get().getPulsarService().getConfiguration().setAllowAutoTopicCreation(true);
- final String clusterName = "test";
- admin.clusters().createCluster(clusterName,
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
- admin.tenants().createTenant(tenantName,
- new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"),
Sets.newHashSet(clusterName)));
+ final String clusterName = SharedPulsarCluster.CLUSTER_NAME;
+ admin.tenants().createTenant(tenantName,
+ new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"),
Sets.newHashSet(clusterName)));
admin.namespaces().createNamespace(tenantAndNsName);
admin.namespaces().setNamespaceReplicationClusters(tenantAndNsName,
Sets.newHashSet(clusterName), false);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index f38871f71b3..c9540f06037 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -18,89 +18,37 @@
*/
package org.apache.pulsar.broker.service.schema;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static
org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.schema.Schemas;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ClientGetSchemaTest extends ProducerConsumerBase {
-
- private static final String topicBytes = "my-property/my-ns/topic-bytes";
- private static final String topicString = "my-property/my-ns/topic-string";
- private static final String topicJson = "my-property/my-ns/topic-json";
- private static final String topicAvro = "my-property/my-ns/topic-avro";
- private static final String topicJsonNotNull =
"my-property/my-ns/topic-json-not-null";
- private static final String topicAvroNotNull =
"my-property/my-ns/topic-avro-not-null";
-
- List<Producer<?>> producers = new ArrayList<>();
+public class ClientGetSchemaTest extends SharedPulsarBaseTest {
private static class MyClass {
public String name;
public int age;
}
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
-
- // Create few topics with different types
-
producers.add(pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create());
-
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
-
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
-
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
-
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder()
- .withPojo(MyClass.class).build())).topic(topicAvro).create());
-
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder()
- .withPojo(MyClass.class).build())).topic(topicJson).create());
-
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder()
-
.withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
-
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder()
-
.withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());
-
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- producers.forEach(t -> {
- try {
- t.close();
- } catch (PulsarClientException e) {
- }
- });
- super.internalCleanup();
- }
-
@DataProvider(name = "serviceUrl")
public Object[] serviceUrls() {
return new Object[] {
- stringSupplier(() -> getPulsar().getBrokerServiceUrl()),
- stringSupplier(() -> getPulsar().getWebServiceAddress())
+ stringSupplier(() -> getBrokerServiceUrl()),
+ stringSupplier(() -> getWebServiceUrl())
};
}
@@ -110,6 +58,19 @@ public class ClientGetSchemaTest extends
ProducerConsumerBase {
@Test(dataProvider = "serviceUrl")
public void testGetSchema(Supplier<String> serviceUrl) throws Exception {
+ String topicBytes = newTopicName();
+ String topicString = newTopicName();
+ String topicJson = newTopicName();
+ String topicAvro = newTopicName();
+
+ // Create topics with different schema types
+ @Cleanup Producer<byte[]> pBytes =
pulsarClient.newProducer(Schema.BYTES).topic(topicBytes).create();
+ @Cleanup Producer<String> pString =
pulsarClient.newProducer(Schema.STRING).topic(topicString).create();
+ @Cleanup Producer<MyClass> pAvro =
+
pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create();
+ @Cleanup Producer<MyClass> pJson =
+
pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create();
+
@Cleanup
PulsarClientImpl client = (PulsarClientImpl)
PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
@@ -123,51 +84,44 @@ public class ClientGetSchemaTest extends
ProducerConsumerBase {
/**
* It validates if schema ledger is deleted or non recoverable then it
will clean up schema storage for the topic
* and make the topic available.
- *
- * @throws Exception
*/
@Test
public void testSchemaFailure() throws Exception {
- final String tenant = PUBLIC_TENANT;
- final String namespace = "test-namespace-" + randomName(16);
- final String topicOne = "test-broken-schema-storage";
- final String fqtnOne = TopicName.get(TopicDomain.persistent.value(),
tenant, namespace, topicOne).toString();
-
- admin.namespaces().createNamespace(tenant + "/" + namespace,
Sets.newHashSet("test"));
+ final String topicOne = newTopicName();
// (1) create topic with schema
Producer<Schemas.PersonTwo> producer = pulsarClient
- .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
- .topic(fqtnOne).create();
+ .topic(topicOne).create();
producer.close();
- String key = TopicName.get(fqtnOne).getSchemaName();
- BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage)
pulsar.getSchemaStorage();
- long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+ String key = TopicName.get(topicOne).getSchemaName();
+ BookkeeperSchemaStorage schemaStorage = (BookkeeperSchemaStorage)
getSchemaStorage();
+ long schemaLedgerId = schemaStorage.getSchemaLedgerList(key).get(0);
// (2) break schema locator by deleting schema-ledger
- schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+ schemaStorage.getBookKeeper().deleteLedger(schemaLedgerId);
- admin.topics().unload(fqtnOne);
+ admin.topics().unload(topicOne);
// (3) create topic again: broker should handle broken schema and load
the topic successfully
producer = pulsarClient
- .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
- .topic(fqtnOne).create();
+ .topic(topicOne).create();
- assertNotEquals(schemaLedgerId,
schemaStrogate.getSchemaLedgerList(key).get(0));
+ assertNotEquals(schemaLedgerId,
schemaStorage.getSchemaLedgerList(key).get(0));
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
personTwo.setId(1);
personTwo.setName("Tom");
Consumer<Schemas.PersonTwo> consumer = pulsarClient
- .newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
+
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
- .subscriptionName("test").topic(fqtnOne).subscribe();
+ .subscriptionName("test").topic(topicOne).subscribe();
producer.send(personTwo);
@@ -181,33 +135,34 @@ public class ClientGetSchemaTest extends
ProducerConsumerBase {
@Test
public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception {
- final String tenant = PUBLIC_TENANT;
- final String namespace = "test-namespace-" + randomName(16);
- final String topicOne = "test-deleted-schema-ledger";
- final String fqtnOne = TopicName.get(TopicDomain.persistent.value(),
tenant, namespace, topicOne).toString();
+ final String topicOne = newTopicName();
- pulsar.getConfig().setSchemaLedgerForceRecovery(true);
- admin.namespaces().createNamespace(tenant + "/" + namespace,
Sets.newHashSet("test"));
+ boolean origValue = getConfig().isSchemaLedgerForceRecovery();
+ getConfig().setSchemaLedgerForceRecovery(true);
+ try {
+ // (1) create topic with schema
+ Producer<Schemas.PersonTwo> producer = pulsarClient
+
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull(false)
+
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
+ .topic(topicOne).create();
- // (1) create topic with schema
- Producer<Schemas.PersonTwo> producer = pulsarClient
- .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>
builder().withAlwaysAllowNull(false)
-
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
- .topic(fqtnOne).create();
+ producer.close();
- producer.close();
-
- String key = TopicName.get(fqtnOne).getSchemaName();
- BookkeeperSchemaStorage schemaStrogate = (BookkeeperSchemaStorage)
pulsar.getSchemaStorage();
- long schemaLedgerId = schemaStrogate.getSchemaLedgerList(key).get(0);
+ String key = TopicName.get(topicOne).getSchemaName();
+ BookkeeperSchemaStorage schemaStorage = (BookkeeperSchemaStorage)
getSchemaStorage();
+ long schemaLedgerId =
schemaStorage.getSchemaLedgerList(key).get(0);
- // (2) break schema locator by deleting schema-ledger
- schemaStrogate.getBookKeeper().deleteLedger(schemaLedgerId);
+ // (2) break schema locator by deleting schema-ledger
+ schemaStorage.getBookKeeper().deleteLedger(schemaLedgerId);
- admin.topics().unload(fqtnOne);
+ admin.topics().unload(topicOne);
- Producer<byte[]> producerWihtoutSchema =
pulsarClient.newProducer().topic(fqtnOne).create();
+ @Cleanup
+ Producer<byte[]> producerWithoutSchema =
pulsarClient.newProducer().topic(topicOne).create();
- assertNotNull(producerWihtoutSchema);
+ assertNotNull(producerWithoutSchema);
+ } finally {
+ getConfig().setSchemaLedgerForceRecovery(origValue);
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
index 9974b834621..d5924ecb510 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BookieClientsStatsGeneratorTest.java
@@ -24,31 +24,19 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.Map;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.common.stats.JvmMetrics;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class BookieClientsStatsGeneratorTest extends BrokerTestBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class BookieClientsStatsGeneratorTest extends SharedPulsarBaseTest {
@Test
public void testBookieClientStatsGenerator() throws Exception {
// should not generate any NPE or other exceptions..
- Map<String, Map<String, PendingBookieOpsStats>> stats =
BookieClientStatsGenerator.generate(super.getPulsar());
+ Map<String, Map<String, PendingBookieOpsStats>> stats =
+
BookieClientStatsGenerator.generate(SharedPulsarCluster.get().getPulsarService());
assertTrue(stats.isEmpty());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index ae0298ade3d..c186dd1f032 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -36,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -43,30 +43,14 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class ConsumerRedeliveryTest extends ProducerConsumerBase {
+public class ConsumerRedeliveryTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(ConsumerRedeliveryTest.class);
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- conf.setManagedLedgerCacheEvictionIntervalMs(10000);
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
@@ -97,76 +81,81 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
*/
@Test(dataProvider = "ackReceiptEnabled")
public void testOrderedRedelivery(boolean ackReceiptEnabled) throws
Exception {
- String topic = "persistent://my-property/my-ns/redelivery-" +
System.currentTimeMillis();
-
- conf.setManagedLedgerMaxEntriesPerLedger(2);
- conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
-
- @Cleanup
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .producerName("my-producer-name")
- .create();
- ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
- .subscriptionType(SubscriptionType.Shared)
- .isAckReceiptEnabled(ackReceiptEnabled);
- ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
consumerBuilder.subscribe();
-
- final int totalMsgs = 100;
-
- for (int i = 0; i < totalMsgs; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
- }
-
+ String topic = newTopicName();
+
+ int origMaxEntries = getConfig().getManagedLedgerMaxEntriesPerLedger();
+ int origMinRollover =
getConfig().getManagedLedgerMinLedgerRolloverTimeMinutes();
+ getConfig().setManagedLedgerMaxEntriesPerLedger(2);
+ getConfig().setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ try {
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .producerName("my-producer-name")
+ .create();
+ ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topic).subscriptionName("s1")
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(ackReceiptEnabled);
+ ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
consumerBuilder.subscribe();
+
+ final int totalMsgs = 100;
+
+ for (int i = 0; i < totalMsgs; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
- int consumedCount = 0;
- Set<MessageId> messageIds = new HashSet<>();
- for (int i = 0; i < totalMsgs; i++) {
- Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
- if (message != null && (consumedCount % 2) == 0) {
- consumer1.acknowledge(message);
- } else {
- messageIds.add(message.getMessageId());
+ int consumedCount = 0;
+ Set<MessageId> messageIds = new HashSet<>();
+ for (int i = 0; i < totalMsgs; i++) {
+ Message<byte[]> message = consumer1.receive(5,
TimeUnit.SECONDS);
+ if (message != null && (consumedCount % 2) == 0) {
+ consumer1.acknowledge(message);
+ } else {
+ messageIds.add(message.getMessageId());
+ }
+ consumedCount += 1;
}
- consumedCount += 1;
- }
- assertEquals(totalMsgs, consumedCount);
-
- // redeliver all unack messages
- consumer1.redeliverUnacknowledgedMessages(messageIds);
-
- MessageIdImpl lastMsgId = null;
- for (int i = 0; i < totalMsgs / 2; i++) {
- Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
- MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
- if (lastMsgId != null) {
- assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId(),
"lastMsgId: "
- + lastMsgId + " -- msgId: " + msgId);
+ assertEquals(totalMsgs, consumedCount);
+
+ // redeliver all unack messages
+ consumer1.redeliverUnacknowledgedMessages(messageIds);
+
+ MessageIdImpl lastMsgId = null;
+ for (int i = 0; i < totalMsgs / 2; i++) {
+ Message<byte[]> message = consumer1.receive(5,
TimeUnit.SECONDS);
+ MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
+ if (lastMsgId != null) {
+ assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId(),
"lastMsgId: "
+ + lastMsgId + " -- msgId: " + msgId);
+ }
+ lastMsgId = msgId;
}
- lastMsgId = msgId;
- }
- // close consumer so, this consumer's unack messages will be
redelivered to new consumer
- consumer1.close();
-
- @Cleanup
- Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
- lastMsgId = null;
- for (int i = 0; i < totalMsgs / 2; i++) {
- Message<byte[]> message = consumer2.receive(5, TimeUnit.SECONDS);
- MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
- if (lastMsgId != null) {
- assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
+ // close consumer so, this consumer's unack messages will be
redelivered to new consumer
+ consumer1.close();
+
+ @Cleanup
+ Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+ lastMsgId = null;
+ for (int i = 0; i < totalMsgs / 2; i++) {
+ Message<byte[]> message = consumer2.receive(5,
TimeUnit.SECONDS);
+ MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
+ if (lastMsgId != null) {
+ assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
+ }
+ lastMsgId = msgId;
}
- lastMsgId = msgId;
+ } finally {
+ getConfig().setManagedLedgerMaxEntriesPerLedger(origMaxEntries);
+
getConfig().setManagedLedgerMinLedgerRolloverTimeMinutes(origMinRollover);
}
}
@Test(dataProvider = "ackReceiptEnabled")
public void testUnAckMessageRedeliveryWithReceiveAsync(boolean
ackReceiptEnabled)
throws PulsarClientException, ExecutionException,
InterruptedException {
- String topic = "persistent://my-property/my-ns/async-unack-redelivery";
+ String topic = newTopicName();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("s1")
@@ -222,12 +211,10 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
@Test
public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
final int queueSize = 10;
int batchSize = 100;
String subName = "my-subscriber-name";
- String topicName = "permitReceiveBatchMessages" +
(UUID.randomUUID().toString());
+ String topicName = newTopicName();
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
.subscribe();
@@ -251,27 +238,22 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
}
producer.flush();
- retryStrategically((test) -> {
- return consumer1.getTotalIncomingMessages() == batchSize;
- }, 5, 2000);
-
- assertEquals(consumer1.getTotalIncomingMessages(), batchSize);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(consumer1.getTotalIncomingMessages(), batchSize));
ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.receiverQueueSize(queueSize).subscriptionType(SubscriptionType.Shared).subscriptionName(subName)
.subscribe();
- retryStrategically((test) -> {
- return consumer2.getTotalIncomingMessages() == queueSize;
- }, 5, 2000);
- assertEquals(consumer2.getTotalIncomingMessages(), queueSize);
- log.info("-- Exiting {} test --", methodName);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(consumer2.getTotalIncomingMessages(), queueSize));
}
@Test(timeOut = 30000)
public void testMessageRedeliveryWhenTimeoutInListener() throws Exception {
+
final String subName =
"sub_testMessageRedeliveryWhenTimeoutInListener";
- final String topicName = "testMessageRedeliveryWhenTimeoutInListener"
+ UUID.randomUUID();
+ final String topicName = newTopicName();
final int messages = 10;
@@ -326,7 +308,7 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
public void testMessageRedeliveryAfterUnloadedWithEarliestPosition()
throws Exception {
final String subName = "my-subscriber-name";
- final String topicName =
"testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
+ final String topicName = newTopicName();
final int messages = 100;
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -376,8 +358,7 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
@Test(timeOut = 30000, dataProvider = "batchedMessageAck")
public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType
ackType) throws Exception {
- String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
- + numAcked + "-" + batchSize + "-" + ackType.getValue();
+ String topic = newTopicName();
@Cleanup Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
@@ -394,30 +375,30 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
String value = "msg-" + i;
producer.sendAsync(value).thenAccept(id -> log.info("{} was sent
to {}", value, id));
}
- List<Message<String>> messages = new ArrayList<>();
+ List<Message<String>> msgs = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
- messages.add(consumer.receive());
+ msgs.add(consumer.receive());
}
if (ackType == CommandAck.AckType.Individual) {
for (int i = 0; i < numAcked; i++) {
- consumer.acknowledge(messages.get(i));
+ consumer.acknowledge(msgs.get(i));
}
} else {
- consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+ consumer.acknowledgeCumulative(msgs.get(numAcked - 1));
}
consumer.redeliverUnacknowledgedMessages();
- messages.clear();
+ msgs.clear();
for (int i = 0; i < batchSize; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("Received {} from {}", msg.getValue(),
msg.getMessageId());
- messages.add(msg);
+ msgs.add(msg);
}
- List<String> values =
messages.stream().map(Message::getValue).collect(Collectors.toList());
+ List<String> values =
msgs.stream().map(Message::getValue).collect(Collectors.toList());
// All messages are redelivered because only if the whole batch are
acknowledged would the message ID be
// added into the ACK tracker.
if (numAcked < batchSize) {
@@ -429,7 +410,7 @@ public class ConsumerRedeliveryTest extends
ProducerConsumerBase {
@Test
public void testRedeliverMessagesWithoutValue() throws Exception {
- String topic =
"persistent://my-property/my-ns/testRedeliverMessagesWithoutValue";
+ String topic = newTopicName();
@Cleanup Consumer<Integer> consumer =
pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 832e814e4ee..31c92583a90 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -51,7 +51,7 @@ import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -66,30 +66,14 @@ import
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class DeadLetterTopicTest extends ProducerConsumerBase {
+public class DeadLetterTopicTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(DeadLetterTopicTest.class);
- @BeforeMethod(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- this.conf.setMaxMessageSize(5 * 1024);
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
@@ -101,7 +85,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicWithMessageKey() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -118,9 +102,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -161,7 +145,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -178,9 +162,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -222,7 +206,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -239,9 +223,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -283,7 +267,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicMessagesWithEventTime() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -300,9 +284,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -344,11 +328,13 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
}
public void testDeadLetterTopicWithProducerName() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final String subscription = "my-subscription";
final String consumerName = "my-consumer";
+ // Extract the short topic name from the full topic for matching
+ String shortTopicName = topic.substring(topic.lastIndexOf('/') + 1);
Pattern deadLetterProducerNamePattern =
-
Pattern.compile("^persistent://my-property/my-ns/dead-letter-topic"
+ Pattern.compile("^persistent://" +
Pattern.quote(getNamespace()) + "/" + Pattern.quote(shortTopicName)
+ "-my-subscription"
+ "-my-consumer"
+ "-[a-zA-Z0-9]{5}"
@@ -370,9 +356,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -413,7 +399,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testMultipleSameNameConsumersToDeadLetterTopic() throws
Exception {
- final String topic =
"persistent://my-property/my-ns/same-name-consumers-dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
final int messageCount = 10;
final int consumerCount = 3;
@@ -492,7 +478,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(dataProvider = "produceLargeMessages")
public void testDeadLetterTopic(boolean produceLargeMessages) throws
Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
@@ -509,9 +495,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -579,7 +565,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(timeOut = 20000)
public void testDeadLetterTopicHasOriginalInfo() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
final int sendMessages = 10;
@@ -595,9 +581,9 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .topic(topic + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -655,13 +641,13 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(timeOut = 20000)
public void testAutoConsumeSchemaDeadLetter() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final String subName = "my-subscription";
final int maxRedeliveryCount = 1;
final int sendMessages = 10;
admin.topics().createNonPartitionedTopic(topic);
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<FooV2> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.AVRO(FooV2.class))
.topic(topic + "-" + subName + "-DLQ")
.subscriptionName("my-subscription")
@@ -725,7 +711,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
final int messageCount = 10;
final int consumerCount = 3;
@@ -801,8 +787,8 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
*/
@Test(enabled = false)
public void testDeadLetterTopicWithMultiTopic() throws Exception {
- final String topic1 =
"persistent://my-property/my-ns/dead-letter-topic-1";
- final String topic2 =
"persistent://my-property/my-ns/dead-letter-topic-2";
+ final String topic1 = newTopicName();
+ final String topic2 = newTopicName();
final int maxRedeliveryCount = 2;
@@ -821,8 +807,8 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
// subscribe to the DLQ topics before consuming original topics
Consumer<byte[]> deadLetterConsumer =
pulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ",
-
"persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
+ .topic(topic1 + "-my-subscription-DLQ",
+ topic2 + "-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -883,9 +869,11 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test(groups = "quarantine")
public void testDeadLetterTopicByCustomTopicName() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
final int sendMessages = 100;
+ final String customDlqTopic = "persistent://" + getNamespace()
+ + "/dead-letter-custom-topic-my-subscription-custom-DLQ";
// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
@@ -896,15 +884,14 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
- .deadLetterTopic("persistent://my-property/my-ns/"
- +
"dead-letter-custom-topic-my-subscription-custom-DLQ")
+ .deadLetterTopic(customDlqTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
-
.topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+ .topic(customDlqTopic)
.subscriptionName("my-subscription")
.subscribe();
@@ -933,7 +920,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
deadLetterConsumer.close();
consumer.close();
@Cleanup
- PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient1 = newPulsarClient();
Consumer<byte[]> checkConsumer =
newPulsarClient1.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
@@ -954,7 +941,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
*/
@Test(timeOut = 200000)
public void testDeadLetterWithoutConsumerReceiveImmediately() throws
PulsarClientException, InterruptedException {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";
+ final String topic = newTopicName();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
@@ -979,7 +966,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicUnderPartitionedTopicWithKeyShareType()
throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic-with-partitioned-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
@@ -1001,15 +988,13 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
Consumer<byte[]> deadLetterConsumer0 =
pulsarClient.newConsumer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/"
- +
"dead-letter-topic-with-partitioned-topic-partition-0-my-subscription-DLQ")
+ .topic(topic + "-partition-0-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Consumer<byte[]> deadLetterConsumer1 =
pulsarClient.newConsumer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/"
- +
"dead-letter-topic-with-partitioned-topic-partition-1-my-subscription-DLQ")
+ .topic(topic + "-partition-1-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -1080,7 +1065,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicWithInitialSubscription() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -1103,7 +1088,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
@@ -1125,10 +1110,10 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
- String deadLetterTopic =
"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
+ String deadLetterTopic = topic + "-my-subscription-DLQ";
Awaitility.await().atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
-
assertTrue(admin.namespaces().getTopics("my-property/my-ns").contains(deadLetterTopic));
+
assertTrue(admin.namespaces().getTopics(getNamespace()).contains(deadLetterTopic));
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
});
@@ -1153,7 +1138,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test()
public void testDeadLetterTopicWithProducerBuilder() throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic-with-producer-builder";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 2;
final int sendMessages = 100;
@@ -1180,7 +1165,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
.topic(topic + "-" + subscriptionName + "-DLQ")
.subscriptionName(subscriptionNameDLQ)
@@ -1283,7 +1268,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers()
throws Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+ final String topic = newTopicName();
final int maxRedeliveryCount = 1;
@@ -1319,7 +1304,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.subscribe();
@Cleanup
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ PulsarClient newPulsarClient = newPulsarClient();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
@@ -1336,10 +1321,10 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumerReceiveForDLQ(otherConsumer, totalReceived,
sendMessages, maxRedeliveryCount))
.get(10, TimeUnit.SECONDS);
- String deadLetterTopic =
"persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ";
+ String deadLetterTopic = topic + "-my-subscription-DLQ";
Awaitility.await().atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
-
assertTrue(admin.namespaces().getTopics("my-property/my-ns").contains(deadLetterTopic));
+
assertTrue(admin.namespaces().getTopics(getNamespace()).contains(deadLetterTopic));
assertTrue(admin.topics().getSubscriptions(deadLetterTopic).contains(dlqInitialSub));
});
@@ -1396,7 +1381,6 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
long number;
public PayloadIncompatible() {
-
}
public PayloadIncompatible(long number) {
@@ -1407,8 +1391,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
// reproduce issue reported in
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
@Test
public void
testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws
Exception {
- String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
- admin.namespaces().createNamespace(namespace);
+ String namespace = getNamespace();
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
@@ -1416,8 +1399,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible =
Schema.AVRO(PayloadIncompatible.class);
- String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
- +
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+ String topic = newTopicName();
String dlqTopic = topic + "-DLQ";
// create topics
@@ -1447,7 +1429,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Thread.sleep(2000L);
-
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+ assertThat(getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be <=
1 so that it doesn't leak producers",
dlqTopic)
.isLessThanOrEqualTo(1);
@@ -1462,7 +1444,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
}
}
-
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+ assertThat(getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be 0 here",
dlqTopic)
.isEqualTo(0);
@@ -1470,8 +1452,8 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
@Test
public void testDeadLetterTopicWithMaxUnackedMessagesBlocking() throws
Exception {
- final String topic =
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking";
- final String dlq =
"persistent://my-property/my-ns/dead-letter-topic-unacked-blocking-my-subscription-DLQ";
+ final String topic = newTopicName();
+ final String dlq = topic + "-my-subscription-DLQ";
final int maxRedeliveryCount = 3;
final int maxUnackedMessages = 100;
final int sendMessages = 1000;
@@ -1565,14 +1547,12 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
// reproduce issue reported in
https://github.com/apache/pulsar/issues/24541
@Test
public void sendDeadLetterTopicWithMismatchSchemaProducer() throws
Exception {
- String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
- admin.namespaces().createNamespace(namespace);
+ String namespace = getNamespace();
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
- final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace
- + "/sendDeadLetterTopicWithMismatchSchemaProducer");
+ final String topic = newTopicName();
final String retryTopic = topic + "-RETRY";
final String deadLetterTopic = topic + "-DLQ";
final Long deadLetterMessageValue = 1234567890L;
@@ -1606,7 +1586,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
Thread.sleep(3000L);
- assertThat(pulsar.getBrokerService().getTopicReference(topic).get()
+ assertThat(getTopicReference(topic).get()
.getSubscription(subscriptionName).getConsumers().get(0).getMessageRedeliverCounter())
.describedAs("redeliver count of topic %s should be less than
or equal to 2 because of mismatch schema",
topic)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
index 48089aae7c2..238ab5b7577 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -25,37 +25,23 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
+public class PatternMultiTopicsConsumerTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(PatternMultiTopicsConsumerTest.class);
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- isTcpLookup = true;
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test(timeOut = 5000)
public void testSimple() throws Exception {
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+ String ns = getNamespace();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern("persistent://" + ns + "/topic.*")
// Make sure topics are discovered before test times out
.patternAutoDiscoveryPeriod(2, TimeUnit.SECONDS)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -65,7 +51,9 @@ public class PatternMultiTopicsConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 5000)
public void testNotifications() throws Exception {
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+ String ns = getNamespace();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topicsPattern("persistent://" + ns + "/topic.*")
// Set auto-discovery period high so that only notifications
can inform us about new topics
.patternAutoDiscoveryPeriod(1, TimeUnit.MINUTES)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
@@ -74,9 +62,10 @@ public class PatternMultiTopicsConsumerTest extends
ProducerConsumerBase {
}
private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
+ String ns = getNamespace();
Map<String, List<String>> sentMessages = new HashMap<>();
for (int p = 0; p < 10; ++p) {
- String name = "persistent://my-property/my-ns/topic-" + p;
+ String name = "persistent://" + ns + "/topic-" + p;
Producer<byte[]> producer =
pulsarClient.newProducer().topic(name).create();
for (int i = 0; i < 10; i++) {
String message = "message-" + p + i;
@@ -100,9 +89,10 @@ public class PatternMultiTopicsConsumerTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testFailedSubscribe() throws Exception {
- final String topicName1 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
- final String topicName2 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
- final String topicName3 =
BrokerTestUtil.newUniqueName("persistent://public/default/tp_test");
+ String ns = getNamespace();
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://"
+ ns + "/tp_test");
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://"
+ ns + "/tp_test");
+ final String topicName3 = BrokerTestUtil.newUniqueName("persistent://"
+ ns + "/tp_test");
final String subName = "s1";
admin.topics().createPartitionedTopic(topicName1, 2);
admin.topics().createPartitionedTopic(topicName2, 3);
@@ -116,7 +106,7 @@ public class PatternMultiTopicsConsumerTest extends
ProducerConsumerBase {
try {
PatternMultiTopicsConsumerImpl<String> consumer =
(PatternMultiTopicsConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
- .topicsPattern("persistent://public/default/tp_test.*")
+ .topicsPattern("persistent://" + ns + "/tp_test.*")
.subscriptionType(SubscriptionType.Failover)
.subscriptionName(subName)
.subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
index 1726101929b..fbd595f9bba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
@@ -23,50 +23,27 @@ import static
org.apache.pulsar.client.api.SubscriptionType.Failover;
import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
import static org.apache.pulsar.client.api.SubscriptionType.Shared;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class UnloadSubscriptionTest extends ProducerConsumerBase {
-
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- conf.setSystemTopicEnabled(false);
- conf.setTransactionCoordinatorEnabled(false);
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class UnloadSubscriptionTest extends SharedPulsarBaseTest {
@DataProvider(name = "unloadCases")
public Object[][] unloadCases (){
@@ -94,7 +71,7 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
@Test(dataProvider = "unloadCases")
public void testSingleConsumer(int msgCount, boolean enabledBatch, int
maxMsgPerBatch, SubscriptionType subType,
int ackMsgCount) throws Exception {
- final String topicName = "persistent://my-property/my-ns/tp-" +
UUID.randomUUID();
+ final String topicName = newTopicName();
final String subName = "sub";
Consumer<String> consumer = createConsumer(topicName, subName,
subType);
ProducerAndMessageIds producerAndMessageIds =
@@ -103,8 +80,7 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
toString(producerAndMessageIds.messageIds));
// Receive all messages and ack some.
- MessagesEntry messagesEntry = receiveAllMessages(consumer);
- assertEquals(messagesEntry.messageSet.size(), msgCount);
+ MessagesEntry messagesEntry = receiveMessages(consumer, msgCount);
if (ackMsgCount > 0){
LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
Iterator<MessageId> messageIdIterator =
messagesEntry.messageIdSet.iterator();
@@ -115,20 +91,18 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
log.info("ack message-ids: {}",
toString(ackedMessageIds.stream().toList()));
}
-
// Unload subscriber.
PersistentTopic persistentTopic = getPersistentTopic(topicName);
persistentTopic.unloadSubscription(subName);
// Receive all messages for the second time.
- MessagesEntry messagesEntryForTheSecondTime =
receiveAllMessages(consumer);
+ int expectedAfterUnload = msgCount - ackMsgCount;
+ MessagesEntry messagesEntryForTheSecondTime =
receiveMessages(consumer, expectedAfterUnload);
log.info("received message-ids for the second time: {}",
toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
- assertEquals(messagesEntryForTheSecondTime.messageSet.size(), msgCount
- ackMsgCount);
// cleanup.
producerAndMessageIds.producer.close();
consumer.close();
- admin.topics().delete(topicName);
}
@Test(dataProvider = "unloadCases")
@@ -137,7 +111,7 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
if (subType == Exclusive){
return;
}
- final String topicName = "persistent://my-property/my-ns/tp-" +
UUID.randomUUID();
+ final String topicName = newTopicName();
final String subName = "sub";
Consumer<String> consumer1 = createConsumer(topicName, subName,
subType);
Consumer<String> consumer2 = createConsumer(topicName, subName,
subType);
@@ -147,18 +121,11 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
toString(producerAndMessageIds.messageIds));
// Receive all messages and ack some.
- MessagesEntry messagesEntry1 = receiveAllMessages(consumer1);
- MessagesEntry messagesEntry2 = receiveAllMessages(consumer2);
- LinkedHashSet<String> allMessages = new LinkedHashSet<>();
- allMessages.addAll(messagesEntry1.messageSet);
- allMessages.addAll(messagesEntry2.messageSet);
- assertEquals(allMessages.size(), msgCount);
+ List<Consumer<String>> consumers = List.of(consumer1, consumer2);
+ MessagesEntry messagesEntry = receiveMessages(consumers, msgCount);
if (ackMsgCount > 0){
- LinkedHashSet<MessageId> allMessageIds = new LinkedHashSet<>();
LinkedHashSet<MessageId> ackedMessageIds = new LinkedHashSet<>();
- allMessageIds.addAll(messagesEntry1.messageIdSet);
- allMessageIds.addAll(messagesEntry2.messageIdSet);
- Iterator<MessageId> messageIdIterator = allMessageIds.iterator();
+ Iterator<MessageId> messageIdIterator =
messagesEntry.messageIdSet.iterator();
for (int i = ackMsgCount; i > 0; i--){
ackedMessageIds.add(messageIdIterator.next());
}
@@ -171,23 +138,16 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
persistentTopic.unloadSubscription(subName);
// Receive all messages for the second time.
- MessagesEntry messagesEntryForTheSecondTime1 =
receiveAllMessages(consumer1);
- MessagesEntry messagesEntryForTheSecondTime2 =
receiveAllMessages(consumer2);
- LinkedHashSet<String> allMessagesForTheSecondTime = new
LinkedHashSet<>();
-
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime1.messageSet);
-
allMessagesForTheSecondTime.addAll(messagesEntryForTheSecondTime2.messageSet);
- LinkedHashSet<MessageId> allMessageIdsForTheSecondTime = new
LinkedHashSet<>();
- allMessageIdsForTheSecondTime.addAll(messagesEntry1.messageIdSet);
- allMessageIdsForTheSecondTime.addAll(messagesEntry2.messageIdSet);
+ int expectedAfterUnload = msgCount - ackMsgCount;
+ MessagesEntry messagesEntryForTheSecondTime =
receiveMessages(consumers, expectedAfterUnload);
log.info("received message-ids for the second time: {}",
- toString(allMessageIdsForTheSecondTime.stream().toList()));
- assertEquals(allMessagesForTheSecondTime.size(), msgCount -
ackMsgCount);
+
toString(messagesEntryForTheSecondTime.messageIdSet.stream().toList()));
+ assertEquals(messagesEntryForTheSecondTime.messageSet.size(),
expectedAfterUnload);
// cleanup.
producerAndMessageIds.producer.close();
consumer1.close();
consumer2.close();
- admin.topics().delete(topicName);
}
private static String toString(List<MessageId> messageIds){
@@ -214,7 +174,7 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
}
private PersistentTopic getPersistentTopic(String topicName) {
- return (PersistentTopic) pulsar.getBrokerService().getTopic(topicName,
false).join().get();
+ return (PersistentTopic) getTopic(topicName, false).join().get();
}
private ProducerAndMessageIds createProducerAndSendMessages(String
topicName, int msgCount, boolean enabledBatch,
@@ -246,20 +206,64 @@ public class UnloadSubscriptionTest extends
ProducerConsumerBase {
return consumer;
}
- private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws
Exception {
- final Set<String> messageSet = Collections.synchronizedSet(new
LinkedHashSet<>());
- final Set<MessageId> messageIdSet = Collections.synchronizedSet(new
LinkedHashSet<>());
- while (true) {
- Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
- if (msg == null){
- break;
- }
+ private MessagesEntry receiveMessages(Consumer<String> consumer, int
expectedCount) throws Exception {
+ final LinkedHashSet<String> messageSet = new LinkedHashSet<>();
+ final LinkedHashSet<MessageId> messageIdSet = new LinkedHashSet<>();
+ for (int i = 0; i < expectedCount; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg, "Expected " + expectedCount + " messages but
only received " + i);
messageIdSet.add(msg.getMessageId());
messageSet.add(msg.getValue());
}
return new MessagesEntry(messageSet, messageIdSet);
}
- private record MessagesEntry(Set<String> messageSet, Set<MessageId>
messageIdSet) {}
+ private MessagesEntry receiveMessages(List<Consumer<String>> consumers,
int expectedCount) throws Exception {
+ final LinkedHashSet<String> messageSet = new LinkedHashSet<>();
+ final LinkedHashSet<MessageId> messageIdSet = new LinkedHashSet<>();
+ // Use receiveAsync on all consumers concurrently and collect until we
have enough
+ List<CompletableFuture<Message<String>>> pending = new ArrayList<>();
+ for (Consumer<String> c : consumers) {
+ pending.add(c.receiveAsync());
+ }
+ long deadline = System.currentTimeMillis() + 30_000;
+ while (messageSet.size() < expectedCount && System.currentTimeMillis()
< deadline) {
+ CompletableFuture<Object> any =
CompletableFuture.anyOf(pending.toArray(new CompletableFuture[0]));
+ long remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ break;
+ }
+ try {
+ any.get(remaining, TimeUnit.MILLISECONDS);
+ } catch (java.util.concurrent.TimeoutException e) {
+ break;
+ }
+ // Collect all completed futures
+ List<CompletableFuture<Message<String>>> newPending = new
ArrayList<>();
+ for (int i = 0; i < pending.size(); i++) {
+ CompletableFuture<Message<String>> f = pending.get(i);
+ if (f.isDone()) {
+ Message<String> msg = f.join();
+ messageIdSet.add(msg.getMessageId());
+ messageSet.add(msg.getValue());
+ if (messageSet.size() < expectedCount) {
+ newPending.add(consumers.get(i).receiveAsync());
+ }
+ } else {
+ newPending.add(f);
+ }
+ }
+ pending = newPending;
+ }
+ assertEquals(messageSet.size(), expectedCount,
+ "Expected " + expectedCount + " messages but received " +
messageSet.size());
+ // Cancel any remaining pending futures
+ for (CompletableFuture<Message<String>> f : pending) {
+ f.cancel(false);
+ }
+ return new MessagesEntry(messageSet, messageIdSet);
+ }
+
+ private record MessagesEntry(LinkedHashSet<String> messageSet,
LinkedHashSet<MessageId> messageIdSet) {}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index ea399af2bc2..5016c0eee77 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -38,31 +38,17 @@ import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test
-public class MessageIdTest extends BrokerTestBase {
+public class MessageIdTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(MessageIdTest.class);
- @BeforeMethod
- @Override
- public void setup() throws Exception {
- baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
-
@Test(timeOut = 10000, dataProviderClass = EnumValuesDataProvider.class,
dataProvider = "values")
public void producerSendAsync(TopicType topicType) throws
PulsarClientException, PulsarAdminException {
// Given
String key = "producerSendAsync-" + topicType;
- final String topicName = "persistent://my-property/my-ns/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
@@ -129,7 +115,7 @@ public class MessageIdTest extends BrokerTestBase {
public void producerSend(TopicType topicType) throws
PulsarClientException, PulsarAdminException {
// Given
String key = "producerSend-" + topicType;
- final String topicName = "persistent://my-property/my-ns/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-subscription-" + key;
final String messagePrefix = "my-message-" + key + "-";
final int numberOfMessages = 30;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index eddcd393a8b..5b38af8637d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -29,32 +29,18 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase {
+public class PerMessageUnAcknowledgedRedeliveryTest extends
SharedPulsarBaseTest {
private static final long testTimeout = 90000; // 1.5 min
private static final Logger log =
LoggerFactory.getLogger(PerMessageUnAcknowledgedRedeliveryTest.class);
private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
- @Override
- @BeforeMethod
- public void setup() throws Exception {
- super.baseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test(timeOut = testTimeout)
public void testSharedAckedNormalTopic() throws Exception {
String key = "testSharedAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
@@ -152,7 +138,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
@Test(timeOut = testTimeout)
public void testUnAckedMessageTrackerSize() throws Exception {
String key = "testUnAckedMessageTrackerSize";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
@@ -194,7 +180,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
@Test(timeOut = testTimeout)
public void testExclusiveAckedNormalTopic() throws Exception {
String key = "testExclusiveAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
@@ -292,7 +278,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
@Test(timeOut = testTimeout)
public void testFailoverAckedNormalTopic() throws Exception {
String key = "testFailoverAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
@@ -396,7 +382,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
@Test(timeOut = testTimeout)
public void testSharedAckedPartitionedTopic() throws Exception {
String key = "testSharedAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
+ final String topicName = "persistent://" + getNamespace() + "/topic-"
+ key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 15;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index e3bd36a8914..87f2a9b131d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -25,10 +25,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -36,26 +35,11 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class ProducerCloseTest extends ProducerConsumerBase {
-
- @Override
- @BeforeMethod
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ProducerCloseTest extends SharedPulsarBaseTest {
/**
* Param1: Producer enableBatch or not
@@ -85,10 +69,11 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
@Test(dataProvider = "brokenPipeline")
public void testProducerCloseCallback2(boolean brokenPipeline) throws
Exception {
- initClient();
@Cleanup
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
- .topic("testProducerClose")
+ PulsarClient client = newPulsarClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
client.newProducer()
+ .topic(newTopicName())
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(false)
@@ -100,23 +85,22 @@ public class ProducerCloseTest extends
ProducerConsumerBase {
producer.closeAsync();
Thread.sleep(3000);
if (brokenPipeline) {
- //producer.getClientCnx().channel().config().setAutoRead(true);
producer.getClientCnx().channel().close();
} else {
producer.getClientCnx().channel().config().setAutoRead(true);
}
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
- System.out.println(1);
Assert.assertTrue(completableFuture.isDone());
});
}
@Test(timeOut = 10_000)
public void testProducerCloseCallback() throws Exception {
- initClient();
@Cleanup
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
- .topic("testProducerClose")
+ PulsarClient client = newPulsarClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
client.newProducer()
+ .topic(newTopicName())
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(false)
@@ -127,7 +111,7 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
final CompletableFuture<MessageId> completableFuture =
value.sendAsync();
producer.closeAsync();
final CommandSuccess commandSuccess = new CommandSuccess();
- PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ PulsarClientImpl clientImpl = (PulsarClientImpl) client;
commandSuccess.setRequestId(clientImpl.newRequestId() - 1);
producer.getClientCnx().handleSuccess(commandSuccess);
Thread.sleep(3000);
@@ -136,10 +120,11 @@ public class ProducerCloseTest extends
ProducerConsumerBase {
@Test(timeOut = 10_000)
public void
testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws
Exception {
- initClient();
@Cleanup
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
- .topic("testProducerClose")
+ PulsarClient client = newPulsarClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
client.newProducer()
+ .topic(newTopicName())
.maxPendingMessages(10)
.batchingMaxPublishDelay(10, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
@@ -165,9 +150,9 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
PulsarClient longBackOffClient = PulsarClient.builder()
.startingBackoffInterval(5, TimeUnit.SECONDS)
.maxBackoffInterval(5, TimeUnit.SECONDS)
- .serviceUrl(lookupUrl.toString())
+ .serviceUrl(getBrokerServiceUrl())
.build();
- String topic = "broker-close-test-" +
RandomStringUtils.randomAlphabetic(5);
+ String topic = newTopicName();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
longBackOffClient.newProducer()
.topic(topic)
@@ -175,8 +160,7 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
.create();
producer.newMessage().value("test".getBytes()).send();
- Optional<Topic> topicOptional = pulsar.getBrokerService()
-
.getTopicReference(TopicName.get(topic).getPartitionedTopicName());
+ Optional<Topic> topicOptional =
getTopicReference(TopicName.get(topic).getPartitionedTopicName());
Assert.assertTrue(topicOptional.isPresent());
topicOptional.get().close(true).get();
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(producer.getState(), HandlerState.State.Connecting));
@@ -186,10 +170,4 @@ public class ProducerCloseTest extends
ProducerConsumerBase {
producer.newMessage().value("test".getBytes()).send();
}
}
-
- private void initClient() throws PulsarClientException {
- replacePulsarClient(PulsarClient.builder().
- serviceUrl(lookupUrl.toString()));
- }
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index f528c827549..c71331026fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -19,46 +19,35 @@
package org.apache.pulsar.client.impl;
import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Tests for not exists topic.
*/
@Test(groups = "broker-impl")
-public class TopicDoesNotExistsTest extends ProducerConsumerBase {
+public class TopicDoesNotExistsTest extends SharedPulsarBaseTest {
- @Override
- @BeforeClass
- public void setup() throws Exception {
- // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
- isTcpLookup = true;
- conf.setAllowAutoTopicCreation(false);
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterClass(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
+ @BeforeMethod(alwaysRun = true)
+ public void disableAutoTopicCreation() throws Exception {
+ admin.namespaces().setAutoTopicCreation(getNamespace(),
+
AutoTopicCreationOverride.builder().allowAutoTopicCreation(false).build());
}
@Test
public void testCreateProducerOnNotExistsTopic() throws
PulsarClientException, InterruptedException {
@Cleanup
- PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+ PulsarClient client =
PulsarClient.builder().serviceUrl(getBrokerServiceUrl()).build();
try {
- pulsarClient.newProducer()
- .topic("persistent://public/default/" +
UUID.randomUUID().toString())
+ client.newProducer()
+ .topic(newTopicName())
.sendTimeout(100, TimeUnit.MILLISECONDS)
.create();
Assert.fail("Create producer should failed while topic does not
exists.");
@@ -66,26 +55,29 @@ public class TopicDoesNotExistsTest extends
ProducerConsumerBase {
Assert.assertTrue(e instanceof
PulsarClientException.TopicDoesNotExistException);
}
Thread.sleep(2000);
- HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
client).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
- Assert.assertEquals(((PulsarClientImpl)
pulsarClient).producersCount(), 0);
+ Assert.assertEquals(((PulsarClientImpl) client).producersCount(), 0);
}
@Test
public void testCreateConsumerOnNotExistsTopic() throws
PulsarClientException, InterruptedException {
@Cleanup
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .operationTimeout(1, TimeUnit.SECONDS)
+ .build();
try {
- pulsarClient.newConsumer()
- .topic("persistent://public/default/" +
UUID.randomUUID().toString())
+ client.newConsumer()
+ .topic(newTopicName())
.subscriptionName("test")
.subscribe();
Assert.fail("Create consumer should failed while topic does not
exists.");
} catch (PulsarClientException ignore) {
}
Thread.sleep(2000);
- HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
client).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
- Assert.assertEquals(((PulsarClientImpl)
pulsarClient).consumersCount(), 0);
+ Assert.assertEquals(((PulsarClientImpl) client).consumersCount(), 0);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
index 3d0f696716e..092626db5c0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.processor;
-import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Arrays;
@@ -27,21 +26,17 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -50,24 +45,7 @@ import org.testng.annotations.Test;
*/
@Slf4j
@Test(groups = "broker-impl")
-public class MessagePayloadProcessorTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- admin.clusters().createCluster("test",
-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid"),
Sets.newHashSet("test")));
- admin.namespaces().createNamespace("public/default",
Sets.newHashSet("test"));
- }
-
- @AfterClass
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class MessagePayloadProcessorTest extends SharedPulsarBaseTest {
@DataProvider
public static Object[][] config() {
@@ -92,7 +70,7 @@ public class MessagePayloadProcessorTest extends
ProducerConsumerBase {
@Test(dataProvider = "config")
public void testDefaultProcessor(int numPartitions, boolean
enableBatching, int batchingMaxMessages)
throws Exception {
- final String topic = "testDefaultProcessor-" + numPartitions + "-" +
enableBatching + "-" + batchingMaxMessages;
+ final String topic = newTopicName();
final int numMessages = 10;
final String messagePrefix = "msg-";
@@ -188,8 +166,10 @@ public class MessagePayloadProcessorTest extends
ProducerConsumerBase {
@Test(dataProvider = "customBatchConfig")
public void testCustomProcessor(final int numMessages, final int
batchingMaxMessages) throws Exception {
- final String topic = "persistent://public/default/testCustomProcessor-"
- + numMessages + "-" + batchingMaxMessages;
+ // Disable dedup: CustomBatchProducer writes directly to managed
ledger without
+ // producer name/sequence ID, causing NPE in MessageDeduplication
+ admin.namespaces().setDeduplicationStatus(getNamespace(), false);
+ final String topic = newTopicName();
@Cleanup
final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
@@ -200,7 +180,7 @@ public class MessagePayloadProcessorTest extends
ProducerConsumerBase {
.subscribe();
final PersistentTopic persistentTopic =
- (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topic).get().orElse(null);
+ (PersistentTopic) getTopicIfExists(topic).get().orElse(null);
Assert.assertNotNull(persistentTopic);
final String messagePrefix = "msg-";