This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2f48446d81cf9a76f7c99a066fd07cfec77bd6da Author: connormcauliffe-toast <connor.mcauli...@toasttab.com> AuthorDate: Fri Feb 21 11:50:25 2020 -0500 CAMEL-14607: support negative acknowledge in pulsar message receipt --- .../pulsar/DefaultPulsarMessageReceipt.java | 2 +- .../pulsar/PulsarConsumerAcknowledgementTest.java | 52 +++++++++++++++++----- .../PulsarConsumerNoAcknowledgementTest.java | 21 ++++----- .../pulsar/PulsarCustomMessageReceiptTest.java | 17 +++---- .../pulsar/PulsarNegativeAcknowledgementTest.java | 33 -------------- 5 files changed, 63 insertions(+), 62 deletions(-) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java index 6d3a1cd..45f6671 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java @@ -55,7 +55,7 @@ public class DefaultPulsarMessageReceipt implements PulsarMessageReceipt { @Override public void negativeAcknowledge() { - throw new UnsupportedOperationException("Negative acknowledge is not supported in this version of the Pulsar client."); + consumer.negativeAcknowledge(messageId); } public Consumer getConsumer() { diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java index 1d34ff1..27b2496 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java @@ -25,7 +25,8 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.pulsar.utils.AutoConfiguration; import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; -import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -43,11 +44,11 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { private static final String TOPIC_URI = "persistent://public/default/camel-topic"; private static final String PRODUCER = "camel-producer-1"; - @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&allowManualAcknowledgement=true" + "&ackTimeoutMillis=1000") private Endpoint from; - @EndpointInject(uri = "mock:result") + @EndpointInject("mock:result") private MockEndpoint to; private Producer<String> producer; @@ -59,23 +60,23 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { } @Override - protected JndiRegistry createRegistry() throws Exception { - JndiRegistry jndi = super.createRegistry(); + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); - registerPulsarBeans(jndi); + registerPulsarBeans(registry); - return jndi; + return registry; } - private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + private void registerPulsarBeans(final Registry registry) throws PulsarClientException { PulsarClient pulsarClient = givenPulsarClient(); AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); - jndi.bind("pulsarClient", pulsarClient); + registry.bind("pulsarClient", pulsarClient); PulsarComponent comp = new PulsarComponent(context); comp.setAutoConfiguration(autoConfiguration); comp.setPulsarClient(pulsarClient); - jndi.bind("pulsar", comp); + registry.bind("pulsar", comp); } private PulsarClient givenPulsarClient() throws PulsarClientException { @@ -183,4 +184,35 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); } + + @Test + public void testNegativeAcknowledge() throws Exception { + to.expectedMessageCount(2); + to.expectedBodiesReceived("Hello World!", "Hello World!"); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + if (!Boolean.parseBoolean(exchange.getProperty("processedOnce", String.class))) { + exchange.setProperty("processedOnce", "true"); + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.negativeAcknowledge(); + } + else { + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + } + }); + } + }); + + producer.newMessage().value("Hello World!").property("proccessedOnce", "false").send(); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java index f57b4a5..ccbec4f 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java @@ -23,7 +23,8 @@ import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.pulsar.utils.AutoConfiguration; -import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -36,11 +37,11 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport { private static final String TOPIC_URI = "persistent://public/default/camel-topic"; private static final String PRODUCER = "camel-producer-1"; - @EndpointInject(uri = "pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + @EndpointInject("pulsar:" + TOPIC_URI + "?numberOfConsumers=1&subscriptionType=Exclusive" + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + "&ackTimeoutMillis=1000") private Endpoint from; - @EndpointInject(uri = "mock:result") + @EndpointInject("mock:result") private MockEndpoint to; @Override @@ -55,26 +56,26 @@ public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport { } @Override - protected JndiRegistry createRegistry() throws Exception { - JndiRegistry jndi = super.createRegistry(); + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); - registerPulsarBeans(jndi); + registerPulsarBeans(registry); - return jndi; + return registry; } - private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + private void registerPulsarBeans(final Registry registry) throws PulsarClientException { PulsarClient pulsarClient = givenPulsarClient(); AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); - jndi.bind("pulsarClient", pulsarClient); + registry.bind("pulsarClient", pulsarClient); PulsarComponent comp = new PulsarComponent(context); comp.setAutoConfiguration(autoConfiguration); comp.setPulsarClient(pulsarClient); comp.setAllowManualAcknowledgement(true); // Set to true here instead of // the endpoint query // parameter. - jndi.bind("pulsar", comp); + registry.bind("pulsar", comp); } private PulsarClient givenPulsarClient() throws PulsarClientException { diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java index eea202e..3cdb832 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java @@ -24,7 +24,8 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.pulsar.utils.AutoConfiguration; import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; -import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.Registry; +import org.apache.camel.support.SimpleRegistry; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -68,25 +69,25 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { } @Override - protected JndiRegistry createRegistry() throws Exception { - JndiRegistry jndi = super.createRegistry(); + protected Registry createCamelRegistry() throws Exception { + Registry registry = new SimpleRegistry(); - registerPulsarBeans(jndi); + registerPulsarBeans(registry); - return jndi; + return registry; } - private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + private void registerPulsarBeans(final Registry registry) throws PulsarClientException { PulsarClient pulsarClient = givenPulsarClient(); AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); - jndi.bind("pulsarClient", pulsarClient); + registry.bind("pulsarClient", pulsarClient); PulsarComponent comp = new PulsarComponent(context); comp.setAutoConfiguration(autoConfiguration); comp.setPulsarClient(pulsarClient); // Test adding a custom PulsarMessageReceiptFactory comp.setPulsarMessageReceiptFactory(mockPulsarMessageReceiptFactory); - jndi.bind("pulsar", comp); + registry.bind("pulsar", comp); } private PulsarClient givenPulsarClient() throws PulsarClientException { diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java deleted file mode 100644 index abb76a0..0000000 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.pulsar; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.MessageId; -import org.junit.Test; - -import static org.mockito.Mockito.mock; - -public class PulsarNegativeAcknowledgementTest { - - @Test(expected = UnsupportedOperationException.class) - public void testNegativeAcknowledgement() { - PulsarMessageReceipt receipt = new DefaultPulsarMessageReceipt(mock(Consumer.class), mock(MessageId.class)); - receipt.negativeAcknowledge(); - } - -}