This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 339bbb9fdb2 camel-google-pubsub: Fix test
339bbb9fdb2 is described below

commit 339bbb9fdb2a04039f3b6336a21f326139c8fd85
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Jun 12 10:55:19 2025 +0200

    camel-google-pubsub: Fix test
---
 .../pubsub/integration/AcknowledgementAsyncIT.java | 119 +++++++++++++++++++++
 ...ntIT.java => ManualAcknowledgementAsyncIT.java} |  40 ++++---
 .../integration/ManualAcknowledgementIT.java       |  25 ++---
 3 files changed, 149 insertions(+), 35 deletions(-)

diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
new file mode 100644
index 00000000000..9a29452c685
--- /dev/null
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.google.pubsub.PubsubTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AcknowledgementAsyncIT extends PubsubTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AcknowledgementAsyncIT.class);
+
+    private static final String TOPIC_NAME = "failureSingleAsync";
+    private static final String SUBSCRIPTION_NAME = "failureSubAsync";
+    private static Boolean fail = false;
+
+    @EndpointInject("direct:in")
+    private Endpoint directIn;
+
+    @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
+    private Endpoint pubsubTopic;
+
+    @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=false")
+    private Endpoint pubsubSubscription;
+
+    @EndpointInject("mock:receiveResult")
+    private MockEndpoint receiveResult;
+
+    @Produce("direct:in")
+    private ProducerTemplate producer;
+
+    @Override
+    public void createTopicSubscription() {
+        createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from(directIn).routeId("Send_to_Fail").to(pubsubTopic);
+
+                
from(pubsubSubscription).routeId("Fail_Receive").autoStartup(true).process(new 
Processor() {
+                    @Override
+                    public void process(Exchange exchange) throws Exception {
+                        if (AcknowledgementAsyncIT.fail) {
+                            throw new Exception("fail");
+                        }
+                    }
+                }).to(receiveResult);
+            }
+        };
+    }
+
+    /**
+     * Testing acknowledgements. Three checks to be performed. Check 1 : 
Successful round trip. Message received and
+     * acknowledged. If the ACK fails for the first message, it will be 
delivered again for the second check and the
+     * body comparison will fail. Check 2 : Failure. As the route throws and 
exception and the message is NACK'ed. The
+     * message should remain in the PubSub Subscription for the third check. 
Check 3 : Success for the second message.
+     * The message received should match the second message sent.
+     */
+    @Test
+    public void singleMessage() throws Exception {
+
+        Exchange firstExchange = new DefaultExchange(context);
+        Exchange secondExchange = new DefaultExchange(context);
+
+        firstExchange.getIn().setBody("SUCCESS  : " + 
firstExchange.getExchangeId());
+        secondExchange.getIn().setBody("fail  : " + 
secondExchange.getExchangeId());
+
+        // Check 1 : Successful roundtrip.
+        LOG.debug("Acknowledgement Test : Stage 1");
+        receiveResult.reset();
+        fail = false;
+        receiveResult.expectedMessageCount(1);
+        
receiveResult.expectedBodiesReceivedInAnyOrder(firstExchange.getIn().getBody());
+        producer.send(firstExchange);
+        receiveResult.assertIsSatisfied(3000);
+
+        // Check 2 : Failure for the second message.
+        LOG.debug("Acknowledgement Test : Stage 2");
+        receiveResult.reset();
+        fail = true;
+        receiveResult.expectedMessageCount(0);
+        producer.send(secondExchange);
+        receiveResult.assertIsSatisfied(3000);
+
+        // Check 3 : Success for the second message.
+        LOG.debug("Acknowledgement Test : Stage 3");
+        receiveResult.reset();
+        fail = false;
+        receiveResult.expectedMessageCount(1);
+        
receiveResult.expectedBodiesReceivedInAnyOrder(secondExchange.getIn().getBody());
+        receiveResult.assertIsSatisfied(3000);
+    }
+}
diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
similarity index 74%
copy from 
components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
copy to 
components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
index 4ba5e8a467d..4287d3c3e14 100644
--- 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java
@@ -28,16 +28,16 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ManualAcknowledgementIT extends PubsubTestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ManualAcknowledgementIT.class);
+public class ManualAcknowledgementAsyncIT extends PubsubTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ManualAcknowledgementAsyncIT.class);
 
-    private static final String TOPIC_NAME = "manualAcknowledgeTopic";
-    private static final String SUBSCRIPTION_NAME = 
"manualAcknowledgeSubscription";
-    private static final String SYNC_ROUTE_ID = 
"receive-from-subscription-sync";
+    private static final String TOPIC_NAME = "manualAcknowledgeAsyncTopic";
+    private static final String SUBSCRIPTION_NAME = 
"manualAcknowledgeAsyncSubscription";
+    private static final String ROUTE_ID = "receive-from-subscription";
     private static Boolean ack = true;
 
-    @EndpointInject("mock:receiveResultSync")
-    private MockEndpoint receiveResultSync;
+    @EndpointInject("mock:receiveResult")
+    private MockEndpoint receiveResult;
 
     @Produce("direct:in")
     private ProducerTemplate producer;
@@ -56,16 +56,16 @@ public class ManualAcknowledgementIT extends 
PubsubTestSupport {
                         .routeId("send-to-topic")
                         .to("google-pubsub:{{project.id}}:" + TOPIC_NAME);
 
-                from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=true&ackMode=NONE")
+                from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=false&ackMode=NONE")
                         .autoStartup(false)
-                        .routeId(SYNC_ROUTE_ID)
-                        .to("mock:receiveResultSync")
+                        .routeId(ROUTE_ID)
+                        .to("mock:receiveResult")
                         .process(exchange -> {
                             GooglePubsubAcknowledge acknowledge
                                     = 
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
                                             GooglePubsubAcknowledge.class);
 
-                            if (ManualAcknowledgementIT.ack) {
+                            if (ManualAcknowledgementAsyncIT.ack) {
                                 acknowledge.ack(exchange);
                             } else {
                                 LOG.debug("Nack!");
@@ -81,20 +81,18 @@ public class ManualAcknowledgementIT extends 
PubsubTestSupport {
         // 2. Synchronous consumer with manual acknowledgement.
         // Message should only be received once.
         producer.sendBody("Testing!");
-        receiveResultSync.expectedMessageCount(1);
-        context.getRouteController().startRoute(SYNC_ROUTE_ID);
-        receiveResultSync.assertIsSatisfied(3000);
-        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+        receiveResult.expectedMessageCount(1);
+        context.getRouteController().startRoute(ROUTE_ID);
+        receiveResult.assertIsSatisfied(3000);
+
+        receiveResult.reset();
 
-        receiveResultSync.reset();
         ack = false;
 
         // 4. Synchronous consumer with manual negative-acknowledgement.
         // Message should be continuously redelivered after being nacked.
-        producer.sendBody("Testing!");
-        receiveResultSync.expectedMinimumMessageCount(3);
-        context.getRouteController().startRoute(SYNC_ROUTE_ID);
-        receiveResultSync.assertIsSatisfied(3000);
-        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+        producer.sendBody("Testing2!");
+        receiveResult.expectedMinimumMessageCount(3);
+        receiveResult.assertIsSatisfied(3000);
     }
 }
diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
index 4ba5e8a467d..a40fad3ae1c 100644
--- 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java
@@ -33,11 +33,11 @@ public class ManualAcknowledgementIT extends 
PubsubTestSupport {
 
     private static final String TOPIC_NAME = "manualAcknowledgeTopic";
     private static final String SUBSCRIPTION_NAME = 
"manualAcknowledgeSubscription";
-    private static final String SYNC_ROUTE_ID = 
"receive-from-subscription-sync";
+    private static final String ROUTE_ID = "receive-from-subscription";
     private static Boolean ack = true;
 
-    @EndpointInject("mock:receiveResultSync")
-    private MockEndpoint receiveResultSync;
+    @EndpointInject("mock:receiveResult")
+    private MockEndpoint receiveResult;
 
     @Produce("direct:in")
     private ProducerTemplate producer;
@@ -58,8 +58,8 @@ public class ManualAcknowledgementIT extends 
PubsubTestSupport {
 
                 from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + 
"?synchronousPull=true&ackMode=NONE")
                         .autoStartup(false)
-                        .routeId(SYNC_ROUTE_ID)
-                        .to("mock:receiveResultSync")
+                        .routeId(ROUTE_ID)
+                        .to("mock:receiveResult")
                         .process(exchange -> {
                             GooglePubsubAcknowledge acknowledge
                                     = 
exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE,
@@ -81,20 +81,17 @@ public class ManualAcknowledgementIT extends 
PubsubTestSupport {
         // 2. Synchronous consumer with manual acknowledgement.
         // Message should only be received once.
         producer.sendBody("Testing!");
-        receiveResultSync.expectedMessageCount(1);
-        context.getRouteController().startRoute(SYNC_ROUTE_ID);
-        receiveResultSync.assertIsSatisfied(3000);
-        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+        receiveResult.expectedMessageCount(1);
+        context.getRouteController().startRoute(ROUTE_ID);
+        receiveResult.assertIsSatisfied(3000);
 
-        receiveResultSync.reset();
+        receiveResult.reset();
         ack = false;
 
         // 4. Synchronous consumer with manual negative-acknowledgement.
         // Message should be continuously redelivered after being nacked.
         producer.sendBody("Testing!");
-        receiveResultSync.expectedMinimumMessageCount(3);
-        context.getRouteController().startRoute(SYNC_ROUTE_ID);
-        receiveResultSync.assertIsSatisfied(3000);
-        context.getRouteController().stopRoute(SYNC_ROUTE_ID);
+        receiveResult.expectedMinimumMessageCount(3);
+        receiveResult.assertIsSatisfied(3000);
     }
 }

Reply via email to