This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new cc75c3cb9d9 CAMEL-15252: Allow manual acknowlegement when ackMode=NONE 
(#13061)
cc75c3cb9d9 is described below

commit cc75c3cb9d9c6852fac3a9c98da92a6faeb104d9
Author: Brice Frisco <39070938+bricefri...@users.noreply.github.com>
AuthorDate: Fri Feb 9 08:44:46 2024 -0600

    CAMEL-15252: Allow manual acknowlegement when ackMode=NONE (#13061)
    
    * CAMEL-15252: Allow manual acknowlegement when ackMode=NONE
    
    * CAMEL-15252: Formatting and autogeneration
    
    * CAMEL-15252: Added documentation for manual acknowledgement
    
    * CAMEL-15252: Fixed typo
---
 .../component/google/pubsub/google-pubsub.json     |   3 +-
 .../src/main/docs/google-pubsub-component.adoc     |  18 +++
 .../google/pubsub/GooglePubsubConstants.java       |   4 +
 .../google/pubsub/GooglePubsubConsumer.java        |  24 ++--
 .../google/pubsub/consumer/AcknowledgeAsync.java   |   7 +-
 ...wledgeAsync.java => AcknowledgeCompletion.java} |  14 +--
 .../google/pubsub/consumer/AcknowledgeSync.java    |  20 ++-
 .../pubsub/consumer/CamelMessageReceiver.java      |  10 +-
 ...edgeAsync.java => GooglePubsubAcknowledge.java} |  32 +++--
 .../integration/ManualAcknowledgementIT.java       | 138 +++++++++++++++++++++
 10 files changed, 219 insertions(+), 51 deletions(-)

diff --git 
a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
 
b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
index 07671f770f3..91b20b2bbbd 100644
--- 
a/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
+++ 
b/components/camel-google/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -39,7 +39,8 @@
     "CamelGooglePubsubMsgAckId": { "index": 1, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The ID used to acknowledge the received 
message.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID" },
     "CamelGooglePubsubPublishTime": { "index": 2, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "com.google.protobuf.Timestamp", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": "The 
time at which the message was published", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#PUBLISH_TIME" },
     "CamelGooglePubsubAttributes": { "index": 3, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "Map<String, String>", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The attributes of the 
message.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ATTRIBUTES" },
-    "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "If non-empty, identifies related 
messages for which publish order should be respected.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" }
+    "CamelGooglePubsubOrderingKey": { "index": 4, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "If non-empty, identifies related 
messages for which publish order should be respected.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePubsubConstants#ORDERING_KEY" },
+    "CamelGooglePubsubAcknowledge": { "index": 5, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Can be used to manually acknowledge or 
negative-acknowledge a message when ackMode=NONE.", "constantName": 
"org.apache.camel.component.google.pubsub.GooglePub [...]
   },
   "properties": {
     "projectId": { "index": 0, "kind": "path", "displayName": "Project Id", 
"group": "common", "label": "common", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The Google Cloud PubSub 
Project Id" },
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
 
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 0f27848a59b..ebe76d07f19 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++ 
b/components/camel-google/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -125,3 +125,21 @@ setting the message header 
`GooglePubsubConstants.ACK_DEADLINE` to the value in
 
 
 include::spring-boot:partial$starter.adoc[]
+
+== Manual Acknowledgement
+
+By default, the PubSub consumer will acknowledge messages once the exchange 
has been processed, or negative-acknowledge them if the exchange has failed.
+
+If the _ackMode_ option is set to `NONE`, the component will not acknowledge 
messages, and it is up to the route to do so.
+In this case, a `GooglePubsubAcknowledge` object is stored in the header 
`GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE` and can be used to 
acknowledge messages:
+
+[source,java]
+----
+from("google-pubsub:{{project.name}}:{{subscription.name}}?ackMode=NONE")
+    .process(exchange -> {
+        GooglePubsubAcknowledge acknowledge = 
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, 
GooglePubsubAcknowledge.class);
+        acknowledge.ack(exchange); // or .nack(exchange)
+    });
+----
+
+Manual acknowledgement works with both the asynchronous and synchronous 
consumers and will use the acknowledgement id which is stored in 
`GooglePubsubConstants.ACK_ID`.
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index 2a53f5e3928..efe6974356e 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -34,6 +34,10 @@ public final class GooglePubsubConstants {
                             " respected.",
               javaType = "String")
     public static final String ORDERING_KEY = "CamelGooglePubsubOrderingKey";
+    @Metadata(label = "consumer", description = "Can be used to manually 
acknowledge or negative-acknowledge a " +
+                                                "message when ackMode=NONE.",
+              javaType = 
"org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge")
+    public static final String GOOGLE_PUBSUB_ACKNOWLEDGE = 
"CamelGooglePubsubAcknowledge";
     public static final String RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX = 
"goog";
 
     public enum AckMode {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 6a7172782b4..40c1fecab1a 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -40,20 +40,22 @@ import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.ReceivedMessage;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
 import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
 import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
+import 
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
 import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class GooglePubsubConsumer extends DefaultConsumer {
 
-    private Logger localLog;
+    private final Logger localLog;
 
     private final GooglePubsubEndpoint endpoint;
     private final Processor processor;
     private ExecutorService executor;
-    private List<Subscriber> subscribers;
+    private final List<Subscriber> subscribers;
     private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;
 
     GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
@@ -188,18 +190,18 @@ public class GooglePubsubConsumer extends DefaultConsumer 
{
                         
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId());
                         
exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, 
pubsubMessage.getMessageId());
                         
exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, 
pubsubMessage.getPublishTime());
+                        
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
 
-                        if (null != pubsubMessage.getAttributesMap()) {
-                            
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
-                        }
+                        //existing subscriber can not be propagated, because 
it will be closed at the end of this block
+                        //subscriber will be created at the moment of use
+                        // (see  
https://issues.apache.org/jira/browse/CAMEL-18447)
+                        GooglePubsubAcknowledge acknowledge = new 
AcknowledgeSync(
+                                () -> 
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName);
 
                         if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
-                            //existing subscriber can not be propagated, 
because it will be closed at the end of this block
-                            //subscriber will be created at the moment of use
-                            // (see  
https://issues.apache.org/jira/browse/CAMEL-18447)
-                            exchange.getExchangeExtension()
-                                    .addOnCompletion(new AcknowledgeSync(
-                                            () -> 
endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName));
+                            
exchange.getExchangeExtension().addOnCompletion(new 
AcknowledgeCompletion(acknowledge));
+                        } else {
+                            
exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, 
acknowledge);
                         }
 
                         try {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
index 1a93db36188..72cdc9ada2f 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
@@ -18,9 +18,8 @@ package org.apache.camel.component.google.pubsub.consumer;
 
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.Synchronization;
 
-public class AcknowledgeAsync implements Synchronization {
+public class AcknowledgeAsync implements GooglePubsubAcknowledge {
 
     private final AckReplyConsumer ackReplyConsumer;
 
@@ -29,12 +28,12 @@ public class AcknowledgeAsync implements Synchronization {
     }
 
     @Override
-    public void onComplete(Exchange exchange) {
+    public void ack(Exchange exchange) {
         ackReplyConsumer.ack();
     }
 
     @Override
-    public void onFailure(Exchange exchange) {
+    public void nack(Exchange exchange) {
         ackReplyConsumer.nack();
     }
 }
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java
similarity index 75%
copy from 
components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
copy to 
components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java
index 1a93db36188..6b222fa3676 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeCompletion.java
@@ -16,25 +16,23 @@
  */
 package org.apache.camel.component.google.pubsub.consumer;
 
-import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Synchronization;
 
-public class AcknowledgeAsync implements Synchronization {
+public class AcknowledgeCompletion implements Synchronization {
+    private final GooglePubsubAcknowledge acknowledge;
 
-    private final AckReplyConsumer ackReplyConsumer;
-
-    public AcknowledgeAsync(AckReplyConsumer ackReplyConsumer) {
-        this.ackReplyConsumer = ackReplyConsumer;
+    public AcknowledgeCompletion(GooglePubsubAcknowledge acknowledge) {
+        this.acknowledge = acknowledge;
     }
 
     @Override
     public void onComplete(Exchange exchange) {
-        ackReplyConsumer.ack();
+        acknowledge.ack(exchange);
     }
 
     @Override
     public void onFailure(Exchange exchange) {
-        ackReplyConsumer.nack();
+        acknowledge.nack(exchange);
     }
 }
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
index 0c2fa0528f3..5cefe15919f 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -22,12 +22,12 @@ import java.util.concurrent.Callable;
 
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ModifyAckDeadlineRequest;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
-import org.apache.camel.spi.Synchronization;
 
-public class AcknowledgeSync implements Synchronization {
+public class AcknowledgeSync implements GooglePubsubAcknowledge {
 
     //Supplier cannot be used because of thrown exception (Callback used 
instead)
     private final Callable<SubscriberStub> subscriberStubSupplier;
@@ -39,7 +39,7 @@ public class AcknowledgeSync implements Synchronization {
     }
 
     @Override
-    public void onComplete(Exchange exchange) {
+    public void ack(Exchange exchange) {
         AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
                 .addAllAckIds(getAckIdList(exchange))
                 .setSubscription(subscriptionName).build();
@@ -51,7 +51,19 @@ public class AcknowledgeSync implements Synchronization {
     }
 
     @Override
-    public void onFailure(Exchange exchange) {
+    public void nack(Exchange exchange) {
+        // There is no explicit nack on the subscriber client. Using 
modifyAckDeadline with 0 seconds
+        // is the recommended way to nack a message. 
https://github.com/googleapis/python-pubsub/pull/123
+        ModifyAckDeadlineRequest nackRequest = 
ModifyAckDeadlineRequest.newBuilder()
+                .addAllAckIds(getAckIdList(exchange))
+                .setSubscription(subscriptionName)
+                .setAckDeadlineSeconds(0).build();
+
+        try (SubscriberStub subscriber = subscriberStubSupplier.call()) {
+            subscriber.modifyAckDeadlineCallable().call(nackRequest);
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
     }
 
     private List<String> getAckIdList(Exchange exchange) {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index 470d7396d80..d7a267cc81d 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -57,13 +57,13 @@ public class CamelMessageReceiver implements 
MessageReceiver {
 
         exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, 
pubsubMessage.getMessageId());
         exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, 
pubsubMessage.getPublishTime());
+        exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
 
-        if (null != pubsubMessage.getAttributesMap()) {
-            exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, 
pubsubMessage.getAttributesMap());
-        }
-
+        GooglePubsubAcknowledge acknowledge = new 
AcknowledgeAsync(ackReplyConsumer);
         if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
-            exchange.getExchangeExtension().addOnCompletion(new 
AcknowledgeAsync(ackReplyConsumer));
+            exchange.getExchangeExtension().addOnCompletion(new 
AcknowledgeCompletion(acknowledge));
+        } else {
+            
exchange.getIn().setHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, 
acknowledge);
         }
 
         try {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java
similarity index 62%
copy from 
components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
copy to 
components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java
index 1a93db36188..5d505d3dce5 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/GooglePubsubAcknowledge.java
@@ -16,25 +16,21 @@
  */
 package org.apache.camel.component.google.pubsub.consumer;
 
-import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.Synchronization;
 
-public class AcknowledgeAsync implements Synchronization {
-
-    private final AckReplyConsumer ackReplyConsumer;
-
-    public AcknowledgeAsync(AckReplyConsumer ackReplyConsumer) {
-        this.ackReplyConsumer = ackReplyConsumer;
-    }
-
-    @Override
-    public void onComplete(Exchange exchange) {
-        ackReplyConsumer.ack();
-    }
+/**
+ * Can be used for acknowledging or negative-acknowledging a PubSub message 
when using the consumer.
+ */
+public interface GooglePubsubAcknowledge {
+    /**
+     * Acknowledges message using the corresponding
+     * {@link 
org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID}
+     */
+    void ack(Exchange exchange);
 
-    @Override
-    public void onFailure(Exchange exchange) {
-        ackReplyConsumer.nack();
-    }
+    /**
+     * Negative-acknowledges message using the corresponding
+     * {@link 
org.apache.camel.component.google.pubsub.GooglePubsubConstants#ACK_ID}
+     */
+    void nack(Exchange exchange);
 }
diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
new file mode 100644
index 00000000000..86b0b29f031
--- /dev/null
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import 
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManualAcknowledgementIT extends PubsubTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ManualAcknowledgementIT.class);
+
+    private static final String TOPIC_NAME = "manualAcknowledgeTopic";
+    private static final String SUBSCRIPTION_NAME = 
"manualAcknowledgeSubscription";
+    private static final String SYNC_ROUTE_ID = 
"receive-from-subscription-sync";
+    private static final String ASYNC_ROUTE_ID = 
"receive-from-subscription-async";
+    private static Boolean ack = true;
+
+    @EndpointInject("mock:receiveResultAsync")
+    private MockEndpoint receiveResultAsync;
+
+    @EndpointInject("mock:receiveResultSync")
+    private MockEndpoint receiveResultSync;
+
+    @Produce("direct:in")
+    private ProducerTemplate producer;
+
+    @Override
+    public void createTopicSubscription() {
+        createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 1);
+    }
+
+    @Override
+    public RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:in")
+                        .routeId("send-to-topic")
+                        .to("google-pubsub:{{project.id}}:" + TOPIC_NAME);
+
+                from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?ackMode=NONE")
+                        .autoStartup(false)
+                        .routeId(ASYNC_ROUTE_ID)
+                        .to("mock:receiveResultAsync")
+                        .process(exchange -> {
+                            GooglePubsubAcknowledge acknowledge
+                                    = 
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
+                                            GooglePubsubAcknowledge.class);
+
+                            if (ManualAcknowledgementIT.ack) {
+                                acknowledge.ack(exchange);
+                            } else {
+                                LOG.debug("Nack!");
+                                acknowledge.nack(exchange);
+                            }
+                        });
+
+                from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=true&ackMode=NONE")
+                        .autoStartup(false)
+                        .routeId(SYNC_ROUTE_ID)
+                        .to("mock:receiveResultSync")
+                        .process(exchange -> {
+                            GooglePubsubAcknowledge acknowledge
+                                    = 
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
+                                            GooglePubsubAcknowledge.class);
+
+                            if (ManualAcknowledgementIT.ack) {
+                                acknowledge.ack(exchange);
+                            } else {
+                                LOG.debug("Nack!");
+                                acknowledge.nack(exchange);
+                            }
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testManualAcknowledgement() throws Exception {
+        // 1. Asynchronous consumer with manual acknowledgement.
+        // Message should only be received once.
+        producer.sendBody("Testing!");
+        receiveResultAsync.expectedMessageCount(1);
+        context.getRouteController().startRoute(ASYNC_ROUTE_ID);
+        receiveResultAsync.assertIsSatisfied(3000);
+        context.getRouteController().stopRoute(ASYNC_ROUTE_ID);
+
+        // 2. Synchronous consumer with manual acknowledgement.
+        // Message should only be received once.
+        producer.sendBody("Testing!");
+        receiveResultSync.expectedMessageCount(1);
+        context.getRouteController().startRoute(SYNC_ROUTE_ID);
+        receiveResultSync.assertIsSatisfied(3000);
+        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+
+        receiveResultSync.reset();
+        receiveResultAsync.reset();
+        ack = false;
+
+        // 3. Asynchronous consumer with manual negative-acknowledgement.
+        // Message should be continuously redelivered after being nacked.
+        producer.sendBody("Testing!");
+        receiveResultAsync.expectedMinimumMessageCount(3);
+        context.getRouteController().startRoute(ASYNC_ROUTE_ID);
+        receiveResultAsync.assertIsSatisfied(3000);
+        context.getRouteController().stopRoute(ASYNC_ROUTE_ID);
+
+        // 4. Synchronous consumer with manual negative-acknowledgement.
+        // Message should be continuously redelivered after being nacked.
+        producer.sendBody("Testing!");
+        receiveResultSync.expectedMinimumMessageCount(3);
+        context.getRouteController().startRoute(SYNC_ROUTE_ID);
+        receiveResultSync.assertIsSatisfied(3000);
+        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+    }
+}

Reply via email to