This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5386cc0fa39d3930250480636374563f993c7c21 Author: Guillaume Nodet <[email protected]> AuthorDate: Fri Feb 5 09:16:20 2021 +0100 Fix failing pulsar unit test --- .../pulsar/PulsarConsumerAcknowledgementTest.java | 73 ++++++++++++---------- .../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 19 +++--- 2 files changed, 53 insertions(+), 39 deletions(-) diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java index 3019e8c..f0c3eb5 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java @@ -18,9 +18,9 @@ package org.apache.camel.component.pulsar; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.Endpoint; -import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.pulsar.utils.AutoConfiguration; @@ -43,27 +43,22 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.test.junit5.TestSupport.body; - public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { @RegisterExtension static PulsarService service = PulsarServiceFactory.createService(); private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerAcknowledgementTest.class); - private static final String TOPIC_URI = "persistent://public/default/camel-topic-1"; + private static final String TOPIC_URI = "persistent://public/default/camel-topic-"; - @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" - + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" - + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000" - + "&negativeAckRedeliveryDelayMicros=100000") private Endpoint from; - @EndpointInject("mock:result") private MockEndpoint to; private Producer<String> producer; + private static int topicId = 0; + public String getPulsarBrokerUrl() { return service.getPulsarBrokerUrl(); } @@ -77,16 +72,25 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { context.removeRoute("myRoute"); String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100); - producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create(); + String topicUri = PulsarConsumerAcknowledgementTest.TOPIC_URI + ++topicId; + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create(); + + from = context.getEndpoint("pulsar:" + topicUri + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000" + + "&negativeAckRedeliveryDelayMicros=100000"); + to = context.getEndpoint("mock:result", MockEndpoint.class); } @AfterEach - public void tearDownProducer() { + public void tearDownProducer() throws Exception { + from.close(); try { producer.close(); } catch (PulsarClientException e) { LOGGER.warn("Failed to close client: {}", e.getMessage(), e); } + context.removeRoute("myRoute"); } @Override @@ -115,12 +119,13 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { @Test public void testAcknowledge() throws Exception { - to.expectsNoDuplicates(body()); + to.expectedMessageCount(1); + to.expectedBodiesReceived("testAcknowledge: Hello World!"); context.addRoutes(new RouteBuilder() { @Override public void configure() { - from(from).routeId("myRoute").to(to).process(exchange -> { + from(from).routeId("testAcknowledge:myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); PulsarMessageReceipt receipt @@ -130,19 +135,20 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { } }); - producer.send("Hello World!"); + producer.send("testAcknowledge: Hello World!"); MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } @Test public void testAcknowledgeAsync() throws Exception { - to.expectsNoDuplicates(body()); + to.expectedMessageCount(1); + to.expectedBodiesReceived("testAcknowledgeAsync: Hello World!"); context.addRoutes(new RouteBuilder() { @Override public void configure() { - from(from).routeId("myRoute").to(to).process(exchange -> { + from(from).routeId("testAcknowledgeAsync:myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); PulsarMessageReceipt receipt @@ -157,51 +163,54 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { } }); - producer.send("Hello World!"); + producer.send("testAcknowledgeAsync: Hello World!"); MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } @Test public void testAcknowledgeCumulative() throws Exception { - to.expectsNoDuplicates(body()); + to.expectedMessageCount(2); + to.expectedBodiesReceived("testAcknowledgeCumulative: Hello World!", "testAcknowledgeCumulative: Hello World Again!"); context.addRoutes(new RouteBuilder() { @Override public void configure() { - from(from).routeId("myRoute").to(to).process(exchange -> { + from(from).routeId("testAcknowledgeCumulative:myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); // Ack the second message. The first will also be acked. - if (exchange.getIn().getBody().equals("Hello World Again!")) { + if (exchange.getIn().getBody().equals("testAcknowledgeCumulative: Hello World Again!")) { receipt.acknowledgeCumulative(); } }); } }); - producer.send("Hello World!"); - producer.send("Hello World Again!"); + producer.send("testAcknowledgeCumulative: Hello World!"); + producer.send("testAcknowledgeCumulative: Hello World Again!"); MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } @Test public void testAcknowledgeCumulativeAsync() throws Exception { - to.expectsNoDuplicates(body()); + to.expectedMessageCount(2); + to.expectedBodiesReceived("testAcknowledgeCumulativeAsync: Hello World!", + "testAcknowledgeCumulativeAsync: Hello World Again!"); context.addRoutes(new RouteBuilder() { @Override public void configure() { - from(from).routeId("myRoute").to(to).process(exchange -> { + from(from).routeId("testAcknowledgeCumulativeAsync:myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); // Ack the second message. The first will also be acked. - if (exchange.getIn().getBody().equals("Hello World Again!")) { + if (exchange.getIn().getBody().equals("testAcknowledgeCumulativeAsync: Hello World Again!")) { try { CompletableFuture<Void> f = receipt.acknowledgeCumulativeAsync(); f.get(); @@ -213,8 +222,8 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { } }); - producer.send("Hello World!"); - producer.send("Hello World Again!"); + producer.send("testAcknowledgeCumulativeAsync: Hello World!"); + producer.send("testAcknowledgeCumulativeAsync: Hello World Again!"); MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } @@ -222,16 +231,16 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { @Test public void testNegativeAcknowledge() throws Exception { to.expectedMessageCount(2); - to.expectedBodiesReceived("Hello World!", "Hello World!"); + to.expectedBodiesReceived("testNegativeAcknowledge: Hello World!", "testNegativeAcknowledge: Hello World!"); + AtomicBoolean processed = new AtomicBoolean(); context.addRoutes(new RouteBuilder() { @Override public void configure() { - from(from).routeId("myRoute").to(to).process(exchange -> { + from(from).routeId("testNegativeAcknowledge:myRoute").to(to).process(exchange -> { LOGGER.info("Processing message {}", exchange.getIn().getBody()); - if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) { - exchange.setProperty("processedOnce", "true"); + if (processed.compareAndSet(false, true)) { PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); receipt.negativeAcknowledge(); @@ -244,7 +253,7 @@ public class PulsarConsumerAcknowledgementTest extends CamelTestSupport { } }); - producer.newMessage().value("Hello World!").property("processedOnce", "false").send(); + producer.newMessage().value("testNegativeAcknowledge: Hello World!").send(); MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java index a5a7ad7..31dd28c 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java @@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerDeadLetterPolicyTest.class); - private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String TOPIC_URI = "persistent://public/default/camel-topic-"; @EndpointInject("mock:result") private MockEndpoint to; @@ -51,6 +51,10 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { private Producer<String> producer; + private static int topicId = 0; + + private String topicUri; + @Override protected Registry createCamelRegistry() throws Exception { Registry registry = new SimpleRegistry(); @@ -81,7 +85,8 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { } String producerName = this.getClass().getSimpleName() + TestUtils.randomWithRange(1, 100); - producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(TOPIC_URI).create(); + topicUri = PulsarConsumerDeadLetterPolicyTest.TOPIC_URI + ++topicId; + producer = givenPulsarClient().newProducer(Schema.STRING).producerName(producerName).topic(topicUri).create(); } @AfterEach @@ -97,7 +102,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { public void givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws Exception { PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); - PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI); + PulsarEndpoint endpoint = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri); assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount()); assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic()); @@ -109,9 +114,9 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); PulsarEndpoint from = (PulsarEndpoint) component - .createEndpoint("pulsar:" + TOPIC_URI + .createEndpoint("pulsar:" + topicUri + "?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); - PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ"); + PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:" + topicUri + "-subs-DLQ"); to.expectedMessageCount(5); deadLetter.expectedMessageCount(1); @@ -134,7 +139,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); PulsarEndpoint from = (PulsarEndpoint) component - .createEndpoint("pulsar:" + TOPIC_URI + .createEndpoint("pulsar:" + topicUri + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic"); @@ -159,7 +164,7 @@ public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport { PulsarComponent component = context.getComponent("pulsar", PulsarComponent.class); PulsarEndpoint from = (PulsarEndpoint) component - .createEndpoint("pulsar:" + TOPIC_URI + .createEndpoint("pulsar:" + topicUri + "?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000"); PulsarEndpoint deadLetterFrom = (PulsarEndpoint) component.createEndpoint("pulsar:persistent://public/default/customTopic");
