This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new c3fbd41 CAMEL-13841: (Backport to 2.x) Allow manual Pulsar message acknowledgments (#3347) c3fbd41 is described below commit c3fbd41b167070c1662677824f97e08546009167 Author: Masa Horiyama <45579221+masahoriyama-to...@users.noreply.github.com> AuthorDate: Mon Nov 18 13:16:43 2019 -0500 CAMEL-13841: (Backport to 2.x) Allow manual Pulsar message acknowledgments (#3347) * Backport CAMEL-13841: Allow manual Pulsar message acknowledgments * Update Javadocs. * Backport manual acknowledgement documentation added in https://github.com/apache/camel/pull/3340 CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields. * Fix checkstyle issues. * Fix ordering in doc. --- .../src/main/docs/pulsar-component.adoc | 10 +- .../pulsar/DefaultPulsarMessageReceipt.java | 67 +++++++ ...ava => DefaultPulsarMessageReceiptFactory.java} | 25 ++- .../camel/component/pulsar/PulsarComponent.java | 32 ++++ .../camel/component/pulsar/PulsarEndpoint.java | 5 + .../component/pulsar/PulsarMessageListener.java | 11 +- .../component/pulsar/PulsarMessageReceipt.java | 70 ++++++++ ...aders.java => PulsarMessageReceiptFactory.java} | 31 ++-- .../pulsar/configuration/PulsarConfiguration.java | 43 +++++ .../consumers/CommonCreationStrategyImpl.java | 3 + .../pulsar/utils/message/PulsarMessageHeaders.java | 1 + .../component/pulsar/PulsarComponentTest.java | 14 ++ .../pulsar/PulsarConsumerAcknowledgementTest.java | 200 +++++++++++++++++++++ .../PulsarConsumerNoAcknowledgementTest.java | 103 +++++++++++ .../pulsar/PulsarCustomMessageReceiptTest.java | 135 ++++++++++++++ .../pulsar/PulsarNegativeAcknowledgementTest.java} | 28 +-- .../springboot/PulsarComponentConfiguration.java | 32 ++++ 17 files changed, 764 insertions(+), 46 deletions(-) diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc index 2638788..164ade4 100644 --- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc +++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc @@ -29,7 +29,7 @@ pulsar:[persistent|non-persistent]://tenant/namespace/topic // component options: START -The Apache Pulsar component supports 3 options, which are listed below. +The Apache Pulsar component supports 5 options, which are listed below. @@ -38,6 +38,8 @@ The Apache Pulsar component supports 3 options, which are listed below. | Name | Description | Default | Type | *autoConfiguration* (common) | The pulsar autoconfiguration | | AutoConfiguration | *pulsarClient* (common) | The pulsar client | | PulsarClient +| *allowManual Acknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean +| *pulsarMessageReceipt Factory* (consumer) | Provide a factory to create an alternate implementation of PulsarMessageReceipt. | | PulsarMessageReceipt Factory | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean |=== // component options: END @@ -65,12 +67,15 @@ with the following path and query parameters: |=== -=== Query Parameters (20 parameters): +=== Query Parameters (23 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *ackGroupTimeMillis* (consumer) | Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 | 100 | long +| *ackTimeoutMillis* (consumer) | Timeout for unacknowledged messages in milliseconds - defaults to 10000 | 10000 | long +| *allowManualAcknowledgement* (consumer) | Whether to allow manual message acknowledgements. If this option is enabled, then messages are not immediately acknowledged after being consumed. Instead, an instance of PulsarMessageReceipt is stored as a header on the org.apache.camel.Exchange. Messages can then be acknowledged using PulsarMessageReceipt at any time before the ackTimeout occurs. | false | boolean | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *consumerName* (consumer) | Name of the consumer when subscription is EXCLUSIVE | sole-consumer | String | *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a SHARED or FAILOVER subscription is used | cons | String @@ -149,5 +154,6 @@ The component supports 3 options, which are listed below. | `key` | `String` | The key of the Pulsar message in String form or the empty string if unset on the Pulsar message | `key_bytes` | `byte[]` | The bytes in the key. If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, the UTF-8 encoded bytes of the string. | `topic_name` | `String` | The topic to which the message was published +| `manual_acknowledgement` | `PulsarManualAcknowledgement` | If allowManualAcknowledgement is set, this will contain the object for manually acknowledging the Pulsar message; otherwise it is unset |=== // message-headers options: END diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java new file mode 100644 index 0000000..39a3ae1 --- /dev/null +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceipt.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; + +public class DefaultPulsarMessageReceipt implements PulsarMessageReceipt { + + private final Consumer consumer; + + private final MessageId messageId; + + public DefaultPulsarMessageReceipt(Consumer consumer, MessageId messageId) { + this.consumer = consumer; + this.messageId = messageId; + } + + @Override + public void acknowledge() throws PulsarClientException { + consumer.acknowledge(messageId); + } + + @Override + public void acknowledgeCumulative() throws PulsarClientException { + consumer.acknowledgeCumulative(messageId); + } + + @Override + public CompletableFuture<Void> acknowledgeAsync() { + return consumer.acknowledgeAsync(messageId); + } + + @Override + public CompletableFuture<Void> acknowledgeCumulativeAsync() { + return consumer.acknowledgeCumulativeAsync(messageId); + } + + @Override + public void negativeAcknowledge() { + throw new UnsupportedOperationException("Negative acknowledge is not supported in this version of the Pulsar client."); + } + + public Consumer getConsumer() { + return consumer; + } + + public MessageId getMessageId() { + return messageId; + } +} diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java similarity index 56% copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java copy to components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java index 1cfbaae..5acc97e 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/DefaultPulsarMessageReceiptFactory.java @@ -14,20 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.pulsar.utils.message; +package org.apache.camel.component.pulsar; -public interface PulsarMessageHeaders { +import org.apache.camel.Exchange; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; + +public class DefaultPulsarMessageReceiptFactory implements PulsarMessageReceiptFactory { + + @Override + public PulsarMessageReceipt newInstance(Exchange exchange, Message message, Consumer consumer) { + return new DefaultPulsarMessageReceipt(consumer, message.getMessageId()); + } - String PROPERTIES = "properties"; - String PRODUCER_NAME = "producer_name"; - String SEQUENCE_ID = "sequence_id"; - String PUBLISH_TIME = "publish_time"; - String MESSAGE_ID = "message_id"; - String EVENT_TIME = "event_time"; - String KEY = "key"; - String KEY_BYTES = "key_bytes"; - String TOPIC_NAME = "topic_name"; - String KEY_OUT = "CamelPulsarProducerMessageKey"; - String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties"; - String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime"; } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java index 61dc415..8d216fd 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarComponent.java @@ -32,6 +32,10 @@ public class PulsarComponent extends DefaultComponent { private AutoConfiguration autoConfiguration; @Metadata private PulsarClient pulsarClient; + @Metadata(label = "consumer", defaultValue = "false") + private boolean allowManualAcknowledgement; + @Metadata(label = "consumer,advanced") + private PulsarMessageReceiptFactory pulsarMessageReceiptFactory = new DefaultPulsarMessageReceiptFactory(); public PulsarComponent() { this(null); @@ -45,6 +49,8 @@ public class PulsarComponent extends DefaultComponent { protected Endpoint createEndpoint(final String uri, final String path, final Map<String, Object> parameters) throws Exception { final PulsarConfiguration configuration = new PulsarConfiguration(); + configuration.setAllowManualAcknowledgement(isAllowManualAcknowledgement()); + setProperties(configuration, parameters); if (autoConfiguration != null) { setProperties(autoConfiguration, parameters); @@ -78,4 +84,30 @@ public class PulsarComponent extends DefaultComponent { public void setPulsarClient(PulsarClient pulsarClient) { this.pulsarClient = pulsarClient; } + + public boolean isAllowManualAcknowledgement() { + return allowManualAcknowledgement; + } + + /** + * Whether to allow manual message acknowledgements. + * <p/> + * If this option is enabled, then messages are not immediately acknowledged after being consumed. + * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}. + * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs. + */ + public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) { + this.allowManualAcknowledgement = allowManualAcknowledgement; + } + + public PulsarMessageReceiptFactory getPulsarMessageReceiptFactory() { + return pulsarMessageReceiptFactory; + } + + /** + * Provide a factory to create an alternate implementation of {@link PulsarMessageReceipt}. + */ + public void setPulsarMessageReceiptFactory(PulsarMessageReceiptFactory pulsarMessageReceiptFactory) { + this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory; + } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java index 67574e9..8c6d993 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarEndpoint.java @@ -89,4 +89,9 @@ public class PulsarEndpoint extends DefaultEndpoint { return topicUri; } + @Override + public PulsarComponent getComponent() { + return (PulsarComponent) super.getComponent(); + } + } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java index 23ed8c4..6f2876f 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java @@ -18,6 +18,7 @@ package org.apache.camel.component.pulsar; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils; import org.apache.camel.spi.ExceptionHandler; import org.apache.pulsar.client.api.Consumer; @@ -45,8 +46,14 @@ public class PulsarMessageListener implements MessageListener<byte[]> { final Exchange exchange = PulsarMessageUtils.updateExchange(message, endpoint.createExchange()); try { - processor.process(exchange); - consumer.acknowledge(message.getMessageId()); + if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) { + exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, + endpoint.getComponent().getPulsarMessageReceiptFactory().newInstance(exchange, message, consumer)); + processor.process(exchange); + } else { + processor.process(exchange); + consumer.acknowledge(message.getMessageId()); + } } catch (Exception exception) { handleProcessorException(exchange, exception); } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java new file mode 100644 index 0000000..58b4cbf --- /dev/null +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceipt.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.CompletableFuture; +import org.apache.camel.Exchange; +import org.apache.camel.component.pulsar.configuration.PulsarConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Acknowledge the receipt of a message using the Pulsar consumer. + * <p> + * Available on the {@link Exchange} if {@link PulsarConfiguration#isAllowManualAcknowledgement()} is true. + * An alternative to the default may be provided by implementing {@link PulsarMessageReceiptFactory}. + */ +public interface PulsarMessageReceipt { + + /** + * Acknowledge receipt of this message synchronously. + * + * @see org.apache.pulsar.client.api.Consumer#acknowledge(MessageId) + */ + void acknowledge() throws PulsarClientException; + + /** + * Acknowledge receipt of all of the messages in the stream up to and including this message synchronously. + * + * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulative(MessageId) + */ + void acknowledgeCumulative() throws PulsarClientException; + + /** + * Acknowledge receipt of this message asynchronously. + * + * @see org.apache.pulsar.client.api.Consumer#acknowledgeAsync(MessageId) + */ + CompletableFuture<Void> acknowledgeAsync(); + + /** + * Acknowledge receipt of all of the messages in the stream up to and including this message asynchronously. + * + * @see org.apache.pulsar.client.api.Consumer#acknowledgeCumulativeAsync(MessageId) + */ + CompletableFuture<Void> acknowledgeCumulativeAsync(); + + /** + * Acknowledge the failure to process this message. + * + * @see org.apache.pulsar.client.api.Consumer#negativeAcknowledge(MessageId) + * Note: Available in Puslar 2.4.0. Implementations with earlier versions should return an {@link java.lang.UnsupportedOperationException}. + */ + void negativeAcknowledge(); + +} + diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java similarity index 52% copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java copy to components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java index 1cfbaae..9c39a74 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageReceiptFactory.java @@ -14,20 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.pulsar.utils.message; +package org.apache.camel.component.pulsar; -public interface PulsarMessageHeaders { +import org.apache.camel.Exchange; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; + +/** + * Factory to create a new {@link PulsarMessageReceipt} to store on the {@link Exchange}. + * <p> + * Implement this interface if an alternate implementation of {@link PulsarMessageReceipt} is required + * as newer Pulsar clients may have acknowledgement functionality not yet supported by {@link DefaultPulsarMessageReceipt}. + */ +public interface PulsarMessageReceiptFactory { + + /** + * Creates a new instance of {@link PulsarMessageReceipt}. + */ + PulsarMessageReceipt newInstance(Exchange exchange, Message message, Consumer consumer); - String PROPERTIES = "properties"; - String PRODUCER_NAME = "producer_name"; - String SEQUENCE_ID = "sequence_id"; - String PUBLISH_TIME = "publish_time"; - String MESSAGE_ID = "message_id"; - String EVENT_TIME = "event_time"; - String KEY = "key"; - String KEY_BYTES = "key_bytes"; - String TOPIC_NAME = "topic_name"; - String KEY_OUT = "CamelPulsarProducerMessageKey"; - String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties"; - String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime"; } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java index 20c29eb..d9c6fd3 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java @@ -42,6 +42,12 @@ public class PulsarConfiguration { private String producerName; @UriParam(label = "consumer", defaultValue = "cons") private String consumerNamePrefix = "cons"; + @UriParam(label = "consumer", defaultValue = "false") + private boolean allowManualAcknowledgement; + @UriParam(label = "consumer", defaultValue = "10000") + private long ackTimeoutMillis = 10000; + @UriParam(label = "consumer", defaultValue = "100") + private long ackGroupTimeMillis = 100; @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000") private int sendTimeoutMs = 30000; @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false") @@ -141,6 +147,43 @@ public class PulsarConfiguration { this.consumerNamePrefix = consumerNamePrefix; } + public boolean isAllowManualAcknowledgement() { + return allowManualAcknowledgement; + } + + /** + * Whether to allow manual message acknowledgements. + * <p/> + * If this option is enabled, then messages are not immediately acknowledged after being consumed. + * Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the {@link org.apache.camel.Exchange}. + * Messages can then be acknowledged using {@link PulsarMessageReceipt} at any time before the ackTimeout occurs. + */ + public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) { + this.allowManualAcknowledgement = allowManualAcknowledgement; + } + + public long getAckTimeoutMillis() { + return ackTimeoutMillis; + } + + /** + * Timeout for unacknowledged messages in milliseconds - defaults to 10000 + */ + public void setAckTimeoutMillis(long ackTimeoutMillis) { + this.ackTimeoutMillis = ackTimeoutMillis; + } + + public long getAckGroupTimeMillis() { + return ackGroupTimeMillis; + } + + /** + * Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100 + */ + public void setAckGroupTimeMillis(long ackGroupTimeMillis) { + this.ackGroupTimeMillis = ackGroupTimeMillis; + } + /** * Send timeout in milliseconds. * Defaults to 30,000ms (30 seconds) diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java index f8e6ed1..274ef79 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.pulsar.utils.consumers; +import java.util.concurrent.TimeUnit; import org.apache.camel.component.pulsar.PulsarConsumer; import org.apache.camel.component.pulsar.PulsarEndpoint; import org.apache.camel.component.pulsar.PulsarMessageListener; @@ -32,6 +33,8 @@ public final class CommonCreationStrategyImpl { return pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getTopicUri()).subscriptionName(endpointConfiguration.getSubscriptionName()) .receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name) + .ackTimeout(endpointConfiguration.getAckTimeoutMillis(), TimeUnit.MILLISECONDS) + .acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(), TimeUnit.MILLISECONDS) .messageListener(new PulsarMessageListener(pulsarEndpoint, pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor())); } } diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java index 1cfbaae..8051b2d 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java @@ -27,6 +27,7 @@ public interface PulsarMessageHeaders { String KEY = "key"; String KEY_BYTES = "key_bytes"; String TOPIC_NAME = "topic_name"; + String MESSAGE_RECEIPT = "message_receipt"; String KEY_OUT = "CamelPulsarProducerMessageKey"; String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties"; String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime"; diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java index 5764f50..087bafb 100644 --- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarComponentTest.java @@ -62,6 +62,7 @@ public class PulsarComponentTest extends CamelTestSupport { assertNull(endpoint.getPulsarConfiguration().getProducerName()); assertEquals("subs", endpoint.getPulsarConfiguration().getSubscriptionName()); assertEquals(SubscriptionType.EXCLUSIVE, endpoint.getPulsarConfiguration().getSubscriptionType()); + assertFalse(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()); } @Test @@ -74,4 +75,17 @@ public class PulsarComponentTest extends CamelTestSupport { verify(autoConfiguration).ensureNameSpaceAndTenant(Matchers.anyString()); } + + @Test + public void testPulsarEndpointAllowManualAcknowledgementDefaultTrue() throws Exception { + PulsarComponent component = new PulsarComponent(context); + component.setAllowManualAcknowledgement(true); + + // allowManualAcknowledgement is absent as a query parameter. + PulsarEndpoint endpoint = (PulsarEndpoint)component.createEndpoint("pulsar://persistent/test/foobar/BatchCreated"); + + assertNotNull(endpoint); + assertTrue(endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()); + } + } diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java new file mode 100644 index 0000000..567acd7 --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; +import org.apache.camel.impl.JndiRegistry; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumerAcknowledgementTest.class); + + private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String PRODUCER = "camel-producer-1"; + + @EndpointInject(uri = "pulsar:" + TOPIC_URI + + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + + "&allowManualAcknowledgement=true" + + "&ackTimeoutMillis=1000" + ) + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + private Producer<String> producer; + + @Before + public void setup() throws Exception { + context.removeRoute("myRoute"); + producer = givenPulsarClient().newProducer(Schema.STRING) + .producerName(PRODUCER) + .topic(TOPIC_URI) + .create(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + registerPulsarBeans(jndi); + + return jndi; + } + + private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + jndi.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + jndi.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl() + .serviceUrl(getPulsarBrokerUrl()) + .ioThreads(1) + .listenerThreads(1) + .build(); + } + + @Test + public void testAcknowledge() throws Exception { + to.expectsNoDuplicates(body()); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + }); + } + }); + + producer.send("Hello World!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } + + @Test + public void testAcknowledgeAsync() throws Exception { + to.expectsNoDuplicates(body()); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + try { + CompletableFuture<Void> f = receipt.acknowledgeAsync(); + f.get(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + }); + } + }); + + producer.send("Hello World!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } + + @Test + public void testAcknowledgeCumulative() throws Exception { + to.expectsNoDuplicates(body()); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + // Ack the second message. The first will also be acked. + if (exchange.getIn().getBody().equals("Hello World Again!")) { + receipt.acknowledgeCumulative(); + } + }); + } + }); + + producer.send("Hello World!"); + producer.send("Hello World Again!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } + + @Test + public void testAcknowledgeCumulativeAsync() throws Exception { + to.expectsNoDuplicates(body()); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + // Ack the second message. The first will also be acked. + if (exchange.getIn().getBody().equals("Hello World Again!")) { + try { + CompletableFuture<Void> f = receipt.acknowledgeCumulativeAsync(); + f.get(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + }); + } + }); + + producer.send("Hello World!"); + producer.send("Hello World Again!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } +} diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java new file mode 100644 index 0000000..8876bd9 --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.impl.JndiRegistry; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Test; + +public class PulsarConsumerNoAcknowledgementTest extends PulsarTestSupport { + + private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String PRODUCER = "camel-producer-1"; + + @EndpointInject(uri = "pulsar:" + TOPIC_URI + + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + + "&ackTimeoutMillis=1000" + ) + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // Nothing in the route will ack the message. + from(from).to(to); + } + }; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + registerPulsarBeans(jndi); + + return jndi; + } + + private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + jndi.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + comp.setAllowManualAcknowledgement(true); // Set to true here instead of the endpoint query parameter. + jndi.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl() + .serviceUrl(getPulsarBrokerUrl()) + .ioThreads(1) + .listenerThreads(1) + .build(); + } + + @Test + public void testAMessageIsConsumedMultipleTimes() throws Exception { + to.expectedMinimumMessageCount(2); + + Producer<String> producer = givenPulsarClient().newProducer(Schema.STRING) + .producerName(PRODUCER) + .topic(TOPIC_URI) + .create(); + + producer.send("Hello World!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + } + +} diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java new file mode 100644 index 0000000..34a8454 --- /dev/null +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.pulsar; + +import java.util.concurrent.TimeUnit; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.pulsar.utils.AutoConfiguration; +import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders; +import org.apache.camel.impl.JndiRegistry; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class PulsarCustomMessageReceiptTest extends PulsarTestSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(PulsarCustomMessageReceiptTest.class); + + private static final String TOPIC_URI = "persistent://public/default/camel-topic"; + private static final String PRODUCER = "camel-producer-1"; + + private PulsarMessageReceiptFactory mockPulsarMessageReceiptFactory = mock(PulsarMessageReceiptFactory.class); + + private PulsarMessageReceipt mockPulsarMessageReceipt = mock(PulsarMessageReceipt.class); + + @EndpointInject(uri = "pulsar:" + TOPIC_URI + + "?numberOfConsumers=1&subscriptionType=Exclusive" + + "&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer" + + "&allowManualAcknowledgement=true" + + "&ackTimeoutMillis=1000" + ) + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + private Producer<String> producer; + + @Before + public void setup() throws Exception { + producer = givenPulsarClient().newProducer(Schema.STRING) + .producerName(PRODUCER) + .topic(TOPIC_URI) + .create(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + + registerPulsarBeans(jndi); + + return jndi; + } + + private void registerPulsarBeans(final JndiRegistry jndi) throws PulsarClientException { + PulsarClient pulsarClient = givenPulsarClient(); + AutoConfiguration autoConfiguration = new AutoConfiguration(null, null); + + jndi.bind("pulsarClient", pulsarClient); + PulsarComponent comp = new PulsarComponent(context); + comp.setAutoConfiguration(autoConfiguration); + comp.setPulsarClient(pulsarClient); + // Test adding a custom PulsarMessageReceiptFactory + comp.setPulsarMessageReceiptFactory(mockPulsarMessageReceiptFactory); + jndi.bind("pulsar", comp); + } + + private PulsarClient givenPulsarClient() throws PulsarClientException { + return new ClientBuilderImpl() + .serviceUrl(getPulsarBrokerUrl()) + .ioThreads(1) + .listenerThreads(1) + .build(); + } + + @Test + public void testAcknowledgeWithCustomMessageReceipt() throws Exception { + to.expectedMinimumMessageCount(2); + + when(mockPulsarMessageReceiptFactory.newInstance(any(), any(), any())).thenReturn(mockPulsarMessageReceipt); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("myRoute").to(to).process(exchange -> { + LOGGER.info("Processing message {}", exchange.getIn().getBody()); + + PulsarMessageReceipt receipt = (PulsarMessageReceipt) exchange.getIn() + .getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT); + receipt.acknowledge(); + }); + } + }); + + producer.send("Hello World!"); + + MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to); + + // The mock does not actually acknowledge using the pulsar consumer, so the message will be re-consumed + // after the ackTimeout. + verify(mockPulsarMessageReceipt, atLeast(2)).acknowledge(); + verifyNoMoreInteractions(mockPulsarMessageReceipt); + } + +} diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java similarity index 56% copy from components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java copy to components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java index 1cfbaae..3329bb6 100644 --- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java +++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java @@ -14,20 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.pulsar.utils.message; +package org.apache.camel.component.pulsar; -public interface PulsarMessageHeaders { +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class PulsarNegativeAcknowledgementTest { + + @Test(expected = UnsupportedOperationException.class) + public void testNegativeAcknowledgement() { + PulsarMessageReceipt receipt = new DefaultPulsarMessageReceipt(mock(Consumer.class), mock(MessageId.class)); + receipt.negativeAcknowledge(); + } - String PROPERTIES = "properties"; - String PRODUCER_NAME = "producer_name"; - String SEQUENCE_ID = "sequence_id"; - String PUBLISH_TIME = "publish_time"; - String MESSAGE_ID = "message_id"; - String EVENT_TIME = "event_time"; - String KEY = "key"; - String KEY_BYTES = "key_bytes"; - String TOPIC_NAME = "topic_name"; - String KEY_OUT = "CamelPulsarProducerMessageKey"; - String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties"; - String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime"; } diff --git a/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java index 94d1679..336329a 100644 --- a/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-pulsar-starter/src/main/java/org/apache/camel/component/pulsar/springboot/PulsarComponentConfiguration.java @@ -46,6 +46,21 @@ public class PulsarComponentConfiguration */ private String pulsarClient; /** + * Whether to allow manual message acknowledgements. If this option is + * enabled, then messages are not immediately acknowledged after being + * consumed. Instead, an instance of PulsarMessageReceipt is stored as a + * header on the org.apache.camel.Exchange. Messages can then be + * acknowledged using PulsarMessageReceipt at any time before the ackTimeout + * occurs. + */ + private Boolean allowManualAcknowledgement = false; + /** + * Provide a factory to create an alternate implementation of + * PulsarMessageReceipt. The option is a + * org.apache.camel.component.pulsar.PulsarMessageReceiptFactory type. + */ + private String pulsarMessageReceiptFactory; + /** * Whether the component should resolve property placeholders on itself when * starting. Only properties which are of String type can use property * placeholders. @@ -69,6 +84,23 @@ public class PulsarComponentConfiguration this.pulsarClient = pulsarClient; } + public Boolean getAllowManualAcknowledgement() { + return allowManualAcknowledgement; + } + + public void setAllowManualAcknowledgement(Boolean allowManualAcknowledgement) { + this.allowManualAcknowledgement = allowManualAcknowledgement; + } + + public String getPulsarMessageReceiptFactory() { + return pulsarMessageReceiptFactory; + } + + public void setPulsarMessageReceiptFactory( + String pulsarMessageReceiptFactory) { + this.pulsarMessageReceiptFactory = pulsarMessageReceiptFactory; + } + public Boolean getResolvePropertyPlaceholders() { return resolvePropertyPlaceholders; }