This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 339bbb9fdb2 camel-google-pubsub: Fix test 339bbb9fdb2 is described below commit 339bbb9fdb2a04039f3b6336a21f326139c8fd85 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jun 12 10:55:19 2025 +0200 camel-google-pubsub: Fix test --- .../pubsub/integration/AcknowledgementAsyncIT.java | 119 +++++++++++++++++++++ ...ntIT.java => ManualAcknowledgementAsyncIT.java} | 40 ++++--- .../integration/ManualAcknowledgementIT.java | 25 ++--- 3 files changed, 149 insertions(+), 35 deletions(-) diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java new file mode 100644 index 00000000000..9a29452c685 --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementAsyncIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub.integration; + +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.pubsub.PubsubTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AcknowledgementAsyncIT extends PubsubTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AcknowledgementAsyncIT.class); + + private static final String TOPIC_NAME = "failureSingleAsync"; + private static final String SUBSCRIPTION_NAME = "failureSubAsync"; + private static Boolean fail = false; + + @EndpointInject("direct:in") + private Endpoint directIn; + + @EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME) + private Endpoint pubsubTopic; + + @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=false") + private Endpoint pubsubSubscription; + + @EndpointInject("mock:receiveResult") + private MockEndpoint receiveResult; + + @Produce("direct:in") + private ProducerTemplate producer; + + @Override + public void createTopicSubscription() { + createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from(directIn).routeId("Send_to_Fail").to(pubsubTopic); + + from(pubsubSubscription).routeId("Fail_Receive").autoStartup(true).process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + if (AcknowledgementAsyncIT.fail) { + throw new Exception("fail"); + } + } + }).to(receiveResult); + } + }; + } + + /** + * Testing acknowledgements. Three checks to be performed. Check 1 : Successful round trip. Message received and + * acknowledged. If the ACK fails for the first message, it will be delivered again for the second check and the + * body comparison will fail. Check 2 : Failure. As the route throws and exception and the message is NACK'ed. The + * message should remain in the PubSub Subscription for the third check. Check 3 : Success for the second message. + * The message received should match the second message sent. + */ + @Test + public void singleMessage() throws Exception { + + Exchange firstExchange = new DefaultExchange(context); + Exchange secondExchange = new DefaultExchange(context); + + firstExchange.getIn().setBody("SUCCESS : " + firstExchange.getExchangeId()); + secondExchange.getIn().setBody("fail : " + secondExchange.getExchangeId()); + + // Check 1 : Successful roundtrip. + LOG.debug("Acknowledgement Test : Stage 1"); + receiveResult.reset(); + fail = false; + receiveResult.expectedMessageCount(1); + receiveResult.expectedBodiesReceivedInAnyOrder(firstExchange.getIn().getBody()); + producer.send(firstExchange); + receiveResult.assertIsSatisfied(3000); + + // Check 2 : Failure for the second message. + LOG.debug("Acknowledgement Test : Stage 2"); + receiveResult.reset(); + fail = true; + receiveResult.expectedMessageCount(0); + producer.send(secondExchange); + receiveResult.assertIsSatisfied(3000); + + // Check 3 : Success for the second message. + LOG.debug("Acknowledgement Test : Stage 3"); + receiveResult.reset(); + fail = false; + receiveResult.expectedMessageCount(1); + receiveResult.expectedBodiesReceivedInAnyOrder(secondExchange.getIn().getBody()); + receiveResult.assertIsSatisfied(3000); + } +} diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java similarity index 74% copy from components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java copy to components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java index 4ba5e8a467d..4287d3c3e14 100644 --- a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementAsyncIT.java @@ -28,16 +28,16 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ManualAcknowledgementIT extends PubsubTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(ManualAcknowledgementIT.class); +public class ManualAcknowledgementAsyncIT extends PubsubTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ManualAcknowledgementAsyncIT.class); - private static final String TOPIC_NAME = "manualAcknowledgeTopic"; - private static final String SUBSCRIPTION_NAME = "manualAcknowledgeSubscription"; - private static final String SYNC_ROUTE_ID = "receive-from-subscription-sync"; + private static final String TOPIC_NAME = "manualAcknowledgeAsyncTopic"; + private static final String SUBSCRIPTION_NAME = "manualAcknowledgeAsyncSubscription"; + private static final String ROUTE_ID = "receive-from-subscription"; private static Boolean ack = true; - @EndpointInject("mock:receiveResultSync") - private MockEndpoint receiveResultSync; + @EndpointInject("mock:receiveResult") + private MockEndpoint receiveResult; @Produce("direct:in") private ProducerTemplate producer; @@ -56,16 +56,16 @@ public class ManualAcknowledgementIT extends PubsubTestSupport { .routeId("send-to-topic") .to("google-pubsub:{{project.id}}:" + TOPIC_NAME); - from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true&ackMode=NONE") + from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=false&ackMode=NONE") .autoStartup(false) - .routeId(SYNC_ROUTE_ID) - .to("mock:receiveResultSync") + .routeId(ROUTE_ID) + .to("mock:receiveResult") .process(exchange -> { GooglePubsubAcknowledge acknowledge = exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, GooglePubsubAcknowledge.class); - if (ManualAcknowledgementIT.ack) { + if (ManualAcknowledgementAsyncIT.ack) { acknowledge.ack(exchange); } else { LOG.debug("Nack!"); @@ -81,20 +81,18 @@ public class ManualAcknowledgementIT extends PubsubTestSupport { // 2. Synchronous consumer with manual acknowledgement. // Message should only be received once. producer.sendBody("Testing!"); - receiveResultSync.expectedMessageCount(1); - context.getRouteController().startRoute(SYNC_ROUTE_ID); - receiveResultSync.assertIsSatisfied(3000); - context.getRouteController().stopRoute(SYNC_ROUTE_ID); + receiveResult.expectedMessageCount(1); + context.getRouteController().startRoute(ROUTE_ID); + receiveResult.assertIsSatisfied(3000); + + receiveResult.reset(); - receiveResultSync.reset(); ack = false; // 4. Synchronous consumer with manual negative-acknowledgement. // Message should be continuously redelivered after being nacked. - producer.sendBody("Testing!"); - receiveResultSync.expectedMinimumMessageCount(3); - context.getRouteController().startRoute(SYNC_ROUTE_ID); - receiveResultSync.assertIsSatisfied(3000); - context.getRouteController().stopRoute(SYNC_ROUTE_ID); + producer.sendBody("Testing2!"); + receiveResult.expectedMinimumMessageCount(3); + receiveResult.assertIsSatisfied(3000); } } diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java index 4ba5e8a467d..a40fad3ae1c 100644 --- a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/ManualAcknowledgementIT.java @@ -33,11 +33,11 @@ public class ManualAcknowledgementIT extends PubsubTestSupport { private static final String TOPIC_NAME = "manualAcknowledgeTopic"; private static final String SUBSCRIPTION_NAME = "manualAcknowledgeSubscription"; - private static final String SYNC_ROUTE_ID = "receive-from-subscription-sync"; + private static final String ROUTE_ID = "receive-from-subscription"; private static Boolean ack = true; - @EndpointInject("mock:receiveResultSync") - private MockEndpoint receiveResultSync; + @EndpointInject("mock:receiveResult") + private MockEndpoint receiveResult; @Produce("direct:in") private ProducerTemplate producer; @@ -58,8 +58,8 @@ public class ManualAcknowledgementIT extends PubsubTestSupport { from("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME + "?synchronousPull=true&ackMode=NONE") .autoStartup(false) - .routeId(SYNC_ROUTE_ID) - .to("mock:receiveResultSync") + .routeId(ROUTE_ID) + .to("mock:receiveResult") .process(exchange -> { GooglePubsubAcknowledge acknowledge = exchange.getIn().getHeader(GooglePubsubConstants.GOOGLE_PUBSUB_ACKNOWLEDGE, @@ -81,20 +81,17 @@ public class ManualAcknowledgementIT extends PubsubTestSupport { // 2. Synchronous consumer with manual acknowledgement. // Message should only be received once. producer.sendBody("Testing!"); - receiveResultSync.expectedMessageCount(1); - context.getRouteController().startRoute(SYNC_ROUTE_ID); - receiveResultSync.assertIsSatisfied(3000); - context.getRouteController().stopRoute(SYNC_ROUTE_ID); + receiveResult.expectedMessageCount(1); + context.getRouteController().startRoute(ROUTE_ID); + receiveResult.assertIsSatisfied(3000); - receiveResultSync.reset(); + receiveResult.reset(); ack = false; // 4. Synchronous consumer with manual negative-acknowledgement. // Message should be continuously redelivered after being nacked. producer.sendBody("Testing!"); - receiveResultSync.expectedMinimumMessageCount(3); - context.getRouteController().startRoute(SYNC_ROUTE_ID); - receiveResultSync.assertIsSatisfied(3000); - context.getRouteController().stopRoute(SYNC_ROUTE_ID); + receiveResult.expectedMinimumMessageCount(3); + receiveResult.assertIsSatisfied(3000); } }