This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new cc75c3cb9d9 CAMEL-15252: Allow manual acknowlegement when ackMode=NONE (#13061) cc75c3cb9d9 is described below commit cc75c3cb9d9c6852fac3a9c98da92a6faeb104d9 Author: Brice Frisco <39070938+bricefri...@users.noreply.github.com> AuthorDate: Fri Feb 9 08:44:46 2024 -0600 CAMEL-15252: Allow manual acknowlegement when ackMode=NONE (#13061) * CAMEL-15252: Allow manual acknowlegement when ackMode=NONE * CAMEL-15252: Formatting and autogeneration * CAMEL-15252: Added documentation for manual acknowledgement * CAMEL-15252: Fixed typo --- .../component/google/pubsub/google-pubsub.json | 3 +- .../src/main/docs/google-pubsub-component.adoc | 18 +++ .../google/pubsub/GooglePubsubConstants.java | 4 + .../google/pubsub/GooglePubsubConsumer.java | 24 ++-- .../google/pubsub/consumer/AcknowledgeAsync.java | 7 +- ...wledgeAsync.java => AcknowledgeCompletion.java} | 14 +-- .../google/pubsub/consumer/AcknowledgeSync.java | 20 ++- .../pubsub/consumer/CamelMessageReceiver.java | 10 +- ...edgeAsync.java => GooglePubsubAcknowledge.java} | 32 +++-- .../integration/ManualAcknowledgementIT.java | 138 +++++++++++++++++++++ 10 files changed, 219 insertions(+), 51 deletions(-) diff --git a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json index 07671f770f3..91b20b2bbbd 100644 --- a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json +++ b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json @@ -39,7 +39,8 @@ "CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ID used to acknowledge the received message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" }, "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.google.protobuf.Timestamp", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The time at which the message was published", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" }, "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The attributes of the message.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" }, - "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" } + "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "If non-empty, identifies related messages for which publish order should be respected.", "constantName": "org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }, + "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Can be used to manually acknowledge or negative-acknowledge a message when ackMode=NONE.", "constantName": "org.apache.camel.component.google.pubsub.GooglePub [...] }, "properties": { "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Google Cloud PubSub Project Id" }, diff --git a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc index 0f27848a59b..ebe76d07f19 100644 --- a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc +++ b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc @@ -125,3 +125,21 @@ setting the message header `GooglePubsubConstants.ACK_DEADLINE` to the value in include::spring-boot:partial$starter.adoc[] + +== Manual Acknowledgement + +By default, the PubSub consumer will acknowledge messages once the exchange has been processed, or negative-acknowledge them if the exchange has failed. + +If the _ackMode_ option is set to `NONE`, the component will not acknowledge messages, and it is up to the route to do so. +In this case, a `GooglePubsubAcknowledge` object is stored in the header `GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE` and can be used to acknowledge messages: + +[source,java] +---- +from("google-pubsub:{{project.name}}:{{subscription.name}}?ackMode=NONE") + .process(exchange -> { + GooglePubsubAcknowledge acknowledge = exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, GooglePubsubAcknowledge.class); + acknowledge.ack(exchange); // or .nack(exchange) + }); +---- + +Manual acknowledgement works with both the asynchronous and synchronous consumers and will use the acknowledgement id which is stored in `GooglePubsubConstants.ACK_ID`. diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java index 2a53f5e3928..efe6974356e 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java @@ -34,6 +34,10 @@ public final class GooglePubsubConstants { " respected.", javaType = "String") public static final String ORDERING_KEY = "CamelGooglePubsubOrderingKey"; + @Metadata(label = "consumer", description = "Can be used to manually acknowledge or negative-acknowledge a " + + "message when ackMode=NONE.", + javaType = "org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge") + public static final String GOOGLE_PUBSUB_ACKNOWLEDGE = "CamelGooglePubsubAcknowledge"; public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = "goog"; public enum AckMode { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 6a7172782b4..40c1fecab1a 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -40,20 +40,22 @@ import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync; import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver; +import org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge; import org.apache.camel.support.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GooglePubsubConsumer extends DefaultConsumer { - private Logger localLog; + private final Logger localLog; private final GooglePubsubEndpoint endpoint; private final Processor processor; private ExecutorService executor; - private List<Subscriber> subscribers; + private final List<Subscriber> subscribers; private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses; GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) { @@ -188,18 +190,18 @@ public class GooglePubsubConsumer extends DefaultConsumer { exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId()); exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, pubsubMessage.getMessageId()); exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime()); + exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); - if (null != pubsubMessage.getAttributesMap()) { - exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); - } + //existing subscriber can not be propagated, because it will be closed at the end of this block + //subscriber will be created at the moment of use + // (see https://issues.apache.org/jira/browse/CAMEL-18447) + GooglePubsubAcknowledge acknowledge = new AcknowledgeSync( + () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName); if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - //existing subscriber can not be propagated, because it will be closed at the end of this block - //subscriber will be created at the moment of use - // (see https://issues.apache.org/jira/browse/CAMEL-18447) - exchange.getExchangeExtension() - .addOnCompletion(new AcknowledgeSync( - () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName)); + exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge)); + } else { + exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, acknowledge); } try { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java index 1a93db36188..72cdc9ada2f 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java @@ -18,9 +18,8 @@ package org.apache.camel.component.google.pubsub.consumer; import com.google.cloud.pubsub.v1.AckReplyConsumer; import org.apache.camel.Exchange; -import org.apache.camel.spi.Synchronization; -public class AcknowledgeAsync implements Synchronization { +public class AcknowledgeAsync implements GooglePubsubAcknowledge { private final AckReplyConsumer ackReplyConsumer; @@ -29,12 +28,12 @@ public class AcknowledgeAsync implements Synchronization { } @Override - public void onComplete(Exchange exchange) { + public void ack(Exchange exchange) { ackReplyConsumer.ack(); } @Override - public void onFailure(Exchange exchange) { + public void nack(Exchange exchange) { ackReplyConsumer.nack(); } } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java similarity index 75% copy from components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java copy to components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java index 1a93db36188..6b222fa3676 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java @@ -16,25 +16,23 @@ */ package org.apache.camel.component.google.pubsub.consumer; -import com.google.cloud.pubsub.v1.AckReplyConsumer; import org.apache.camel.Exchange; import org.apache.camel.spi.Synchronization; -public class AcknowledgeAsync implements Synchronization { +public class AcknowledgeCompletion implements Synchronization { + private final GooglePubsubAcknowledge acknowledge; - private final AckReplyConsumer ackReplyConsumer; - - public AcknowledgeAsync(AckReplyConsumer ackReplyConsumer) { - this.ackReplyConsumer = ackReplyConsumer; + public AcknowledgeCompletion(GooglePubsubAcknowledge acknowledge) { + this.acknowledge = acknowledge; } @Override public void onComplete(Exchange exchange) { - ackReplyConsumer.ack(); + acknowledge.ack(exchange); } @Override public void onFailure(Exchange exchange) { - ackReplyConsumer.nack(); + acknowledge.nack(exchange); } } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java index 0c2fa0528f3..5cefe15919f 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java @@ -22,12 +22,12 @@ import java.util.concurrent.Callable; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.google.pubsub.GooglePubsubConstants; -import org.apache.camel.spi.Synchronization; -public class AcknowledgeSync implements Synchronization { +public class AcknowledgeSync implements GooglePubsubAcknowledge { //Supplier cannot be used because of thrown exception (Callback used instead) private final Callable<SubscriberStub> subscriberStubSupplier; @@ -39,7 +39,7 @@ public class AcknowledgeSync implements Synchronization { } @Override - public void onComplete(Exchange exchange) { + public void ack(Exchange exchange) { AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder() .addAllAckIds(getAckIdList(exchange)) .setSubscription(subscriptionName).build(); @@ -51,7 +51,19 @@ public class AcknowledgeSync implements Synchronization { } @Override - public void onFailure(Exchange exchange) { + public void nack(Exchange exchange) { + // There is no explicit nack on the subscriber client. Using modifyAckDeadline with 0 seconds + // is the recommended way to nack a message. https://github.com/googleapis/python-pubsub/pull/123 + ModifyAckDeadlineRequest nackRequest = ModifyAckDeadlineRequest.newBuilder() + .addAllAckIds(getAckIdList(exchange)) + .setSubscription(subscriptionName) + .setAckDeadlineSeconds(0).build(); + + try (SubscriberStub subscriber = subscriberStubSupplier.call()) { + subscriber.modifyAckDeadlineCallable().call(nackRequest); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } } private List<String> getAckIdList(Exchange exchange) { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index 470d7396d80..d7a267cc81d 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -57,13 +57,13 @@ public class CamelMessageReceiver implements MessageReceiver { exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, pubsubMessage.getMessageId()); exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, pubsubMessage.getPublishTime()); + exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); - if (null != pubsubMessage.getAttributesMap()) { - exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, pubsubMessage.getAttributesMap()); - } - + GooglePubsubAcknowledge acknowledge = new AcknowledgeAsync(ackReplyConsumer); if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - exchange.getExchangeExtension().addOnCompletion(new AcknowledgeAsync(ackReplyConsumer)); + exchange.getExchangeExtension().addOnCompletion(new AcknowledgeCompletion(acknowledge)); + } else { + exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, acknowledge); } try { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java similarity index 62% copy from components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java copy to components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java index 1a93db36188..5d505d3dce5 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java @@ -16,25 +16,21 @@ */ package org.apache.camel.component.google.pubsub.consumer; -import com.google.cloud.pubsub.v1.AckReplyConsumer; import org.apache.camel.Exchange; -import org.apache.camel.spi.Synchronization; -public class AcknowledgeAsync implements Synchronization { - - private final AckReplyConsumer ackReplyConsumer; - - public AcknowledgeAsync(AckReplyConsumer ackReplyConsumer) { - this.ackReplyConsumer = ackReplyConsumer; - } - - @Override - public void onComplete(Exchange exchange) { - ackReplyConsumer.ack(); - } +/** + * Can be used for acknowledging or negative-acknowledging a PubSub message when using the consumer. + */ +public interface GooglePubsubAcknowledge { + /** + * Acknowledges message using the corresponding + * {@link org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID} + */ + void ack(Exchange exchange); - @Override - public void onFailure(Exchange exchange) { - ackReplyConsumer.nack(); - } + /** + * Negative-acknowledges message using the corresponding + * {@link org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID} + */ + void nack(Exchange exchange); } diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java new file mode 100644 index 00000000000..86b0b29f031 --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.GooglePubsubConstants; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManualAcknowledgementIT extends PubsubTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ManualAcknowledgementIT.class); + + private static final String TOPIC_NAME = "manualAcknowledgeTopic"; + private static final String SUBSCRIPTION_NAME = "manualAcknowledgeSubscription"; + private static final String SYNC_ROUTE_ID = "receive-from-subscription-sync"; + private static final String ASYNC_ROUTE_ID = "receive-from-subscription-async"; + private static Boolean ack = true; + + @EndpointInject("mock:receiveResultAsync") + private MockEndpoint receiveResultAsync; + + @EndpointInject("mock:receiveResultSync") + private MockEndpoint receiveResultSync; + + @Produce("direct:in") + private ProducerTemplate producer; + + @Override + public void createTopicSubscription() { + createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 1); + } + + @Override + public RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:in") + .routeId("send-to-topic") + .to("google-pubsub:{{project.id}}:" + TOPIC_NAME); + + from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?ackMode=NONE") + .autoStartup(false) + .routeId(ASYNC_ROUTE_ID) + .to("mock:receiveResultAsync") + .process(exchange -> { + GooglePubsubAcknowledge acknowledge + = exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, + GooglePubsubAcknowledge.class); + + if (ManualAcknowledgementIT.ack) { + acknowledge.ack(exchange); + } else { + LOG.debug("Nack!"); + acknowledge.nack(exchange); + } + }); + + from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true&ackMode=NONE") + .autoStartup(false) + .routeId(SYNC_ROUTE_ID) + .to("mock:receiveResultSync") + .process(exchange -> { + GooglePubsubAcknowledge acknowledge + = exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, + GooglePubsubAcknowledge.class); + + if (ManualAcknowledgementIT.ack) { + acknowledge.ack(exchange); + } else { + LOG.debug("Nack!"); + acknowledge.nack(exchange); + } + }); + } + }; + } + + @Test + public void testManualAcknowledgement() throws Exception { + // 1. Asynchronous consumer with manual acknowledgement. + // Message should only be received once. + producer.sendBody("Testing!"); + receiveResultAsync.expectedMessageCount(1); + context.getRouteController().startRoute(ASYNC_ROUTE_ID); + receiveResultAsync.assertIsSatisfied(3000); + context.getRouteController().stopRoute(ASYNC_ROUTE_ID); + + // 2. Synchronous consumer with manual acknowledgement. + // Message should only be received once. + producer.sendBody("Testing!"); + receiveResultSync.expectedMessageCount(1); + context.getRouteController().startRoute(SYNC_ROUTE_ID); + receiveResultSync.assertIsSatisfied(3000); + context.getRouteController().stopRoute(SYNC_ROUTE_ID); + + receiveResultSync.reset(); + receiveResultAsync.reset(); + ack = false; + + // 3. Asynchronous consumer with manual negative-acknowledgement. + // Message should be continuously redelivered after being nacked. + producer.sendBody("Testing!"); + receiveResultAsync.expectedMinimumMessageCount(3); + context.getRouteController().startRoute(ASYNC_ROUTE_ID); + receiveResultAsync.assertIsSatisfied(3000); + context.getRouteController().stopRoute(ASYNC_ROUTE_ID); + + // 4. Synchronous consumer with manual negative-acknowledgement. + // Message should be continuously redelivered after being nacked. + producer.sendBody("Testing!"); + receiveResultSync.expectedMinimumMessageCount(3); + context.getRouteController().startRoute(SYNC_ROUTE_ID); + receiveResultSync.assertIsSatisfied(3000); + context.getRouteController().stopRoute(SYNC_ROUTE_ID); + } +}