This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new 78e2052 Added nats native support fixes #1578 78e2052 is described below commit 78e20524af3b9b23ab0655a53f2bc5dc61388bc9 Author: aldettinger <aldettin...@gmail.com> AuthorDate: Wed Sep 2 10:41:38 2020 +0200 Added nats native support fixes #1578 --- docs/modules/ROOT/pages/reference/components.adoc | 4 +- .../ROOT/pages/reference/extensions/nats.adoc | 2 +- docs/modules/ROOT/pages/reference/index.adoc | 4 +- extensions-jvm/pom.xml | 1 - .../nats/deployment/pom.xml | 0 .../component/nats/deployment/NatsProcessor.java | 20 ++-- {extensions-jvm => extensions}/nats/pom.xml | 1 - .../nats/runtime/pom.xml | 0 .../main/resources/META-INF/quarkus-extension.yaml | 0 extensions/pom.xml | 1 + .../nats}/pom.xml | 59 ++++++++-- .../component/nats/it/NatsConfiguration.java | 60 +++++++++++ .../quarkus/component/nats/it/NatsResource.java | 70 ++++++++++++ .../quarkus/component/nats/it/NatsRoutes.java | 42 ++++---- .../camel/quarkus/component/nats/it/NatsIT.java | 16 +-- .../camel/quarkus/component/nats/it/NatsTest.java | 120 +++++++++++++++++++++ .../component/nats/it/NatsTestResource.java | 97 +++++++++++++++++ integration-tests/pom.xml | 1 + tooling/scripts/test-categories.yaml | 1 + 19 files changed, 435 insertions(+), 64 deletions(-) diff --git a/docs/modules/ROOT/pages/reference/components.adoc b/docs/modules/ROOT/pages/reference/components.adoc index 69d1ad2..c7f3157 100644 --- a/docs/modules/ROOT/pages/reference/components.adoc +++ b/docs/modules/ROOT/pages/reference/components.adoc @@ -565,8 +565,8 @@ Stable | 1.0.0 | Interact with MongoDB GridFS. | xref:reference/extensions/mustache.adoc[Mustache] | [.camel-element-artifact]##camel-quarkus-mustache## | [.camel-element-Native]##Native## + Stable | 1.0.0 | Transform messages using a Mustache template. -| xref:reference/extensions/nats.adoc[Nats] | [.camel-element-artifact]##camel-quarkus-nats## | [.camel-element-JVM]##JVM## + -Preview | 1.1.0 | Send and receive messages from NATS messaging system. +| xref:reference/extensions/nats.adoc[Nats] | [.camel-element-artifact]##camel-quarkus-nats## | [.camel-element-Native]##Native## + +Stable | 1.1.0 | Send and receive messages from NATS messaging system. | xref:reference/extensions/netty.adoc[Netty] | [.camel-element-artifact]##camel-quarkus-netty## | [.camel-element-Native]##Native## + Stable | 0.4.0 | Socket level networking using TCP or UDP with the Netty 4.x. diff --git a/docs/modules/ROOT/pages/reference/extensions/nats.adoc b/docs/modules/ROOT/pages/reference/extensions/nats.adoc index 98165d3..53a7e47 100644 --- a/docs/modules/ROOT/pages/reference/extensions/nats.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/nats.adoc @@ -6,7 +6,7 @@ :page-aliases: extensions/nats.adoc [.badges] -[.badge-key]##Since Camel Quarkus##[.badge-version]##1.1.0## [.badge-key]##JVM##[.badge-supported]##supported## [.badge-key]##Native##[.badge-unsupported]##unsupported## +[.badge-key]##Since Camel Quarkus##[.badge-version]##1.1.0## [.badge-key]##JVM##[.badge-supported]##supported## [.badge-key]##Native##[.badge-supported]##supported## Send and receive messages from NATS messaging system. diff --git a/docs/modules/ROOT/pages/reference/index.adoc b/docs/modules/ROOT/pages/reference/index.adoc index d6460fb..8308c71 100644 --- a/docs/modules/ROOT/pages/reference/index.adoc +++ b/docs/modules/ROOT/pages/reference/index.adoc @@ -512,8 +512,8 @@ Stable | 1.0.0 | Interact with MongoDB GridFS. | xref:reference/extensions/mustache.adoc[Mustache] | camel-quarkus-mustache | [.camel-element-Native]##Native## + Stable | 1.0.0 | Transform messages using a Mustache template. -| xref:reference/extensions/nats.adoc[Nats] | camel-quarkus-nats | [.camel-element-JVM]##JVM## + -Preview | 1.1.0 | Send and receive messages from NATS messaging system. +| xref:reference/extensions/nats.adoc[Nats] | camel-quarkus-nats | [.camel-element-Native]##Native## + +Stable | 1.1.0 | Send and receive messages from NATS messaging system. | xref:reference/extensions/netty.adoc[Netty] | camel-quarkus-netty | [.camel-element-Native]##Native## + Stable | 0.4.0 | Socket level networking using TCP or UDP with the Netty 4.x. diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml index 90d8a1f..0b4a4ee 100644 --- a/extensions-jvm/pom.xml +++ b/extensions-jvm/pom.xml @@ -89,7 +89,6 @@ <module>jbpm</module> <module>jooq</module> <module>jsch</module> - <module>nats</module> <module>nitrite</module> <module>ognl</module> <module>openstack</module> diff --git a/extensions-jvm/nats/deployment/pom.xml b/extensions/nats/deployment/pom.xml similarity index 100% rename from extensions-jvm/nats/deployment/pom.xml rename to extensions/nats/deployment/pom.xml diff --git a/extensions-jvm/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java b/extensions/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java similarity index 62% rename from extensions-jvm/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java rename to extensions/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java index eb37ee7..1826178 100644 --- a/extensions-jvm/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java +++ b/extensions/nats/deployment/src/main/java/org/apache/camel/quarkus/component/nats/deployment/NatsProcessor.java @@ -16,17 +16,14 @@ */ package org.apache.camel.quarkus.component.nats.deployment; +import io.nats.client.Options; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.annotations.ExecutionTime; -import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.pkg.steps.NativeBuild; -import org.apache.camel.quarkus.core.JvmOnlyRecorder; -import org.jboss.logging.Logger; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; class NatsProcessor { - private static final Logger LOG = Logger.getLogger(NatsProcessor.class); private static final String FEATURE = "camel-nats"; @BuildStep @@ -34,13 +31,8 @@ class NatsProcessor { return new FeatureBuildItem(FEATURE); } - /** - * Remove this once this extension starts supporting the native mode. - */ - @BuildStep(onlyIf = NativeBuild.class) - @Record(value = ExecutionTime.RUNTIME_INIT) - void warnJvmInNative(JvmOnlyRecorder recorder) { - JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time - recorder.warnJvmInNative(FEATURE); // warn at runtime + @BuildStep + void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> producer) { + producer.produce(new ReflectiveClassBuildItem(false, false, Options.DEFAULT_DATA_PORT_TYPE)); } } diff --git a/extensions-jvm/nats/pom.xml b/extensions/nats/pom.xml similarity index 97% rename from extensions-jvm/nats/pom.xml rename to extensions/nats/pom.xml index 70685b1..1f5f411 100644 --- a/extensions-jvm/nats/pom.xml +++ b/extensions/nats/pom.xml @@ -35,6 +35,5 @@ <modules> <module>deployment</module> <module>runtime</module> - <module>integration-test</module> </modules> </project> diff --git a/extensions-jvm/nats/runtime/pom.xml b/extensions/nats/runtime/pom.xml similarity index 100% rename from extensions-jvm/nats/runtime/pom.xml rename to extensions/nats/runtime/pom.xml diff --git a/extensions-jvm/nats/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/nats/runtime/src/main/resources/META-INF/quarkus-extension.yaml similarity index 100% rename from extensions-jvm/nats/runtime/src/main/resources/META-INF/quarkus-extension.yaml rename to extensions/nats/runtime/src/main/resources/META-INF/quarkus-extension.yaml diff --git a/extensions/pom.xml b/extensions/pom.xml index 3bdc46f..4b35bd5 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -137,6 +137,7 @@ <module>mongodb</module> <module>mongodb-gridfs</module> <module>mustache</module> + <module>nats</module> <module>netty</module> <module>netty-http</module> <module>olingo4</module> diff --git a/extensions-jvm/nats/integration-test/pom.xml b/integration-tests/nats/pom.xml similarity index 62% rename from extensions-jvm/nats/integration-test/pom.xml rename to integration-tests/nats/pom.xml index 38aed9e..faeff4e 100644 --- a/extensions-jvm/nats/integration-test/pom.xml +++ b/integration-tests/nats/pom.xml @@ -18,18 +18,17 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-build-parent-it</artifactId> + <artifactId>camel-quarkus-integration-tests</artifactId> <version>1.1.0-SNAPSHOT</version> - <relativePath>../../../poms/build-parent-it/pom.xml</relativePath> </parent> - <artifactId>camel-quarkus-nats-integration-test</artifactId> - <name>Camel Quarkus :: Nats :: Integration Test</name> + <artifactId>camel-quarkus-integration-test-nats</artifactId> + <name>Camel Quarkus :: Integration Tests :: Nats</name> <description>Integration tests for Camel Quarkus Nats extension</description> <properties> @@ -60,9 +59,17 @@ <artifactId>camel-quarkus-nats</artifactId> </dependency> <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-bean</artifactId> + </dependency> + <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> <!-- test dependencies --> <dependency> @@ -75,6 +82,16 @@ <artifactId>rest-assured</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-testcontainers-support</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -92,4 +109,34 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsConfiguration.java b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsConfiguration.java new file mode 100644 index 0000000..47562f4 --- /dev/null +++ b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsConfiguration.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.nats.it; + +import javax.inject.Named; + +import org.apache.camel.component.nats.NatsComponent; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +public class NatsConfiguration { + + public static final String NATS_BROKER_URL_BASIC_AUTH_CONFIG_KEY = "camel.nats.test.broker-url-basic-auth"; + public static final String NATS_BROKER_URL_NO_AUTH_CONFIG_KEY = "camel.nats.test.broker-url-no-auth"; + public static final String NATS_BROKER_URL_TOKEN_AUTH_CONFIG_KEY = "camel.nats.test.broker-url-token-auth"; + + @ConfigProperty(name = NATS_BROKER_URL_BASIC_AUTH_CONFIG_KEY) + String natsBasicAuthBrokerUrl; + + @ConfigProperty(name = NATS_BROKER_URL_NO_AUTH_CONFIG_KEY) + String natsNoAuthBrokerUrl; + + @ConfigProperty(name = NATS_BROKER_URL_TOKEN_AUTH_CONFIG_KEY) + String natsTokenAuthBrokerUrl; + + @Named + NatsComponent natsBasicAuth() { + NatsComponent component = new NatsComponent(); + component.setServers(natsBasicAuthBrokerUrl); + return component; + } + + @Named + NatsComponent natsNoAuth() { + NatsComponent component = new NatsComponent(); + component.setServers(natsNoAuthBrokerUrl); + return component; + } + + @Named + NatsComponent natsTokenAuth() { + NatsComponent component = new NatsComponent(); + component.setServers(natsTokenAuthBrokerUrl); + return component; + } + +} diff --git a/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java new file mode 100644 index 0000000..423b1e5 --- /dev/null +++ b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.nats.it; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.camel.Body; +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.jboss.logging.Logger; + +@Path("/nats/") +@ApplicationScoped +public class NatsResource { + + private static final Logger LOG = Logger.getLogger(NatsResource.class); + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> msgStore = new ConcurrentHashMap<>(); + + @Inject + ProducerTemplate template; + + @Path("/send") + @POST + @Consumes(MediaType.TEXT_PLAIN) + public void send(@HeaderParam("sendToEndpointUri") String sendToEndpointUri, String message) { + LOG.debugf("Invoking send with (%s, %s)", sendToEndpointUri, message); + template.sendBody(sendToEndpointUri, message); + } + + void storeMessage(Exchange e, @Body String message) { + LOG.debugf("Invoking storeMessage with (%s, %s)", e, message); + msgStore.computeIfAbsent(e.getFromRouteId(), s -> new ConcurrentLinkedQueue<>()).add(message); + } + + @Path("/messages/{route-id}") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Collection<String> getRouteMessages(@PathParam("route-id") String routeId) { + return msgStore.get(routeId); + } + +} diff --git a/extensions-jvm/nats/integration-test/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsRoutes.java similarity index 50% rename from extensions-jvm/nats/integration-test/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java rename to integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsRoutes.java index df49396..3943b4a 100644 --- a/extensions-jvm/nats/integration-test/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsResource.java +++ b/integration-tests/nats/src/main/java/org/apache/camel/quarkus/component/nats/it/NatsRoutes.java @@ -18,34 +18,28 @@ package org.apache.camel.quarkus.component.nats.it; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import org.apache.camel.CamelContext; -import org.jboss.logging.Logger; +import org.apache.camel.builder.RouteBuilder; -@Path("/nats") @ApplicationScoped -public class NatsResource { +public class NatsRoutes extends RouteBuilder { - private static final Logger LOG = Logger.getLogger(NatsResource.class); - - private static final String COMPONENT_NATS = "nats"; @Inject - CamelContext context; - - @Path("/load/component/nats") - @GET - @Produces(MediaType.TEXT_PLAIN) - public Response loadComponentNats() throws Exception { - /* This is an autogenerated test */ - if (context.getComponent(COMPONENT_NATS) != null) { - return Response.ok().build(); - } - LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_NATS); - return Response.status(500, COMPONENT_NATS + " could not be loaded from the Camel context").build(); + NatsResource natsResource; + + @Override + public void configure() { + from("natsBasicAuth:test").routeId("basic-auth").bean(natsResource, "storeMessage"); + from("natsNoAuth:test").routeId("no-auth").bean(natsResource, "storeMessage"); + from("natsTokenAuth:test").routeId("token-auth").bean(natsResource, "storeMessage"); + + from("natsNoAuth:max?maxMessages=2").routeId("2-msg-max").bean(natsResource, "storeMessage"); + + String maxMsgUriPattern = "natsNoAuth:qmax?maxMessages=%s&queueName=q"; + fromF(maxMsgUriPattern, 3).routeId("3-qmsg-max").bean(natsResource, "storeMessage"); + fromF(maxMsgUriPattern, 8).routeId("8-qmsg-max").bean(natsResource, "storeMessage"); + + from("natsNoAuth:request-reply").setBody().simple("${body} => Reply"); + from("natsNoAuth:reply").routeId("reply").bean(natsResource, "storeMessage"); } } diff --git a/extensions-jvm/nats/integration-test/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsIT.java similarity index 71% rename from extensions-jvm/nats/integration-test/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java rename to integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsIT.java index dcbba0d..d82b9fe 100644 --- a/extensions-jvm/nats/integration-test/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java +++ b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsIT.java @@ -16,19 +16,9 @@ */ package org.apache.camel.quarkus.component.nats.it; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; -import org.junit.jupiter.api.Test; +import io.quarkus.test.junit.NativeImageTest; -@QuarkusTest -class NatsTest { - - @Test - public void loadComponentNats() { - /* A simple autogenerated test */ - RestAssured.get("/nats/load/component/nats") - .then() - .statusCode(200); - } +@NativeImageTest +class NatsIT extends NatsTest { } diff --git a/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java new file mode 100644 index 0000000..6cb9154 --- /dev/null +++ b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.nats.it; + +import java.util.concurrent.TimeUnit; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.Header; +import org.junit.jupiter.api.Test; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@QuarkusTestResource(NatsTestResource.class) +@QuarkusTest +class NatsTest { + + @Test + void basicAuthProduceConsumeRoundTripShouldSucceed() { + Header header = new Header("sendToEndpointUri", "natsBasicAuth:test"); + given().when().header(header).body("basic-auth-msg").post("/nats/send").then().statusCode(204); + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/basic-auth").path("size()").equals(1); + }); + + String[] messages = given().get("/nats/messages/basic-auth").then().statusCode(200).extract().as(String[].class); + assertEquals(1, messages.length); + assertEquals("basic-auth-msg", messages[0]); + } + + @Test + void noAuthProduceConsumeRoundTripShouldSucceed() { + Header header = new Header("sendToEndpointUri", "natsNoAuth:test"); + given().when().header(header).body("no-auth-msg").post("/nats/send").then().statusCode(204); + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/no-auth").path("size()").equals(1); + }); + + String[] messages = given().get("/nats/messages/no-auth").then().statusCode(200).extract().as(String[].class); + assertEquals(1, messages.length); + assertEquals("no-auth-msg", messages[0]); + } + + @Test + void tokenAuthProduceConsumeRoundTripShouldSucceed() { + Header header = new Header("sendToEndpointUri", "natsTokenAuth:test"); + given().when().header(header).body("token-auth-msg").post("/nats/send").then().statusCode(204); + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/token-auth").path("size()").equals(1); + }); + + String[] messages = given().get("/nats/messages/token-auth").then().statusCode(200).extract().as(String[].class); + assertEquals(1, messages.length); + assertEquals("token-auth-msg", messages[0]); + } + + @Test + void consumeMaxMessagesShouldRetainFirstTwoMessages() { + Header header = new Header("sendToEndpointUri", "natsNoAuth:max"); + for (int msgNumber = 1; msgNumber <= 10; msgNumber++) { + given().when().header(header).body("msg " + msgNumber).post("/nats/send").then().statusCode(204); + } + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/2-msg-max").path("size()").equals(2); + }); + + String[] messages = given().get("/nats/messages/2-msg-max").then().statusCode(200).extract().as(String[].class); + assertEquals(2, messages.length); + assertEquals("msg 1", messages[0]); + assertEquals("msg 2", messages[1]); + } + + @Test + void consumeMaxQueueMessagesShouldRetainRightNumberOfMessages() { + Header header = new Header("sendToEndpointUri", "natsNoAuth:qmax"); + for (int msgNumber = 1; msgNumber <= 20; msgNumber++) { + given().when().header(header).body("qmsg " + msgNumber).post("/nats/send").then().statusCode(204); + } + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/3-qmsg-max").path("size()").equals(3) + && given().get("/nats/messages/8-qmsg-max").path("size()").equals(8); + }); + } + + @Test + void requestReplyShouldSucceed() { + Header header = new Header("sendToEndpointUri", "natsNoAuth:request-reply?replySubject=reply"); + given().when().header(header).body("Request").post("/nats/send").then().statusCode(204); + + await().atMost(10L, TimeUnit.SECONDS).until(() -> { + return given().get("/nats/messages/reply").path("size()").equals(1); + }); + + String[] messages = given().get("/nats/messages/reply").then().statusCode(200).extract().as(String[].class); + assertEquals(1, messages.length); + assertEquals("Request => Reply", messages[0]); + } + +} diff --git a/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTestResource.java b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTestResource.java new file mode 100644 index 0000000..028aee6 --- /dev/null +++ b/integration-tests/nats/src/test/java/org/apache/camel/quarkus/component/nats/it/NatsTestResource.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.nats.it; + +import java.util.Map; + +import org.apache.camel.quarkus.testcontainers.ContainerResourceLifecycleManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.TestcontainersConfiguration; + +import static org.apache.camel.quarkus.component.nats.it.NatsConfiguration.NATS_BROKER_URL_BASIC_AUTH_CONFIG_KEY; +import static org.apache.camel.quarkus.component.nats.it.NatsConfiguration.NATS_BROKER_URL_NO_AUTH_CONFIG_KEY; +import static org.apache.camel.quarkus.component.nats.it.NatsConfiguration.NATS_BROKER_URL_TOKEN_AUTH_CONFIG_KEY; +import static org.apache.camel.util.CollectionHelper.mapOf; + +public class NatsTestResource implements ContainerResourceLifecycleManager { + + private static final Logger LOG = LoggerFactory.getLogger(NatsTestResource.class); + private static final String BASIC_AUTH_USERNAME = "admin"; + private static final String BASIC_AUTH_PASSWORD = "password"; + private static final String NATS_IMAGE = "nats:2.1.4"; + private static final int NATS_SERVER_PORT = 4222; + private static final String TOKEN_AUTH_TOKEN = "!admin23456"; + + private GenericContainer basicAuthContainer, noAuthContainer, tokenAuthContainer; + + @Override + public Map<String, String> start() { + LOG.info(TestcontainersConfiguration.getInstance().toString()); + + // Start the container needed for the basic authentication test + basicAuthContainer = new GenericContainer(NATS_IMAGE).withExposedPorts(NATS_SERVER_PORT) + .withCommand("-DV", "--user", BASIC_AUTH_USERNAME, "--pass", BASIC_AUTH_PASSWORD) + .waitingFor(Wait.forLogMessage(".*Server is ready.*", 1)); + basicAuthContainer.start(); + String basicAuthIp = basicAuthContainer.getContainerIpAddress(); + Integer basicAuthPort = basicAuthContainer.getMappedPort(NATS_SERVER_PORT); + String basicAuthAuthority = BASIC_AUTH_USERNAME + ":" + BASIC_AUTH_PASSWORD; + String basicAuthBrokerUrl = String.format("%s@%s:%d", basicAuthAuthority, basicAuthIp, basicAuthPort); + + // Start the container needed for tests without authentication + noAuthContainer = new GenericContainer(NATS_IMAGE).withExposedPorts(NATS_SERVER_PORT) + .waitingFor(Wait.forLogMessage(".*Listening for route connections.*", 1)); + noAuthContainer.start(); + String noAuthIp = noAuthContainer.getContainerIpAddress(); + Integer noAuthPort = noAuthContainer.getMappedPort(NATS_SERVER_PORT); + String noAuthBrokerUrl = String.format("%s:%s", noAuthIp, noAuthPort); + + // Start the container needed for the token authentication test + tokenAuthContainer = new GenericContainer(NATS_IMAGE).withExposedPorts(NATS_SERVER_PORT) + .withCommand("-DV", "-auth", TOKEN_AUTH_TOKEN) + .waitingFor(Wait.forLogMessage(".*Server is ready.*", 1)); + tokenAuthContainer.start(); + String tokenAuthIp = tokenAuthContainer.getContainerIpAddress(); + Integer tokenAuthPort = tokenAuthContainer.getMappedPort(NATS_SERVER_PORT); + String tokenAuthBrokerUrl = String.format("%s@%s:%d", TOKEN_AUTH_TOKEN, tokenAuthIp, tokenAuthPort); + + Map<String, String> Properties = mapOf(NATS_BROKER_URL_BASIC_AUTH_CONFIG_KEY, basicAuthBrokerUrl); + Properties.put(NATS_BROKER_URL_NO_AUTH_CONFIG_KEY, noAuthBrokerUrl); + Properties.put(NATS_BROKER_URL_TOKEN_AUTH_CONFIG_KEY, tokenAuthBrokerUrl); + return Properties; + } + + @Override + public void stop() { + stop(basicAuthContainer, "natsBasicAuthContainer"); + stop(noAuthContainer, "natsNoAuthContainer"); + stop(tokenAuthContainer, "natsTokenAuthContainer"); + } + + private void stop(GenericContainer<?> container, String id) { + try { + if (container != null) { + container.stop(); + } + } catch (Exception ex) { + LOG.error("An issue occured while stopping " + id, ex); + } + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 8f8642b..97a3685 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -114,6 +114,7 @@ <module>mock</module> <module>mongodb</module> <module>mustache</module> + <module>nats</module> <module>netty</module> <module>olingo4</module> <module>openapi-java</module> diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml index b148895..81d20b1 100644 --- a/tooling/scripts/test-categories.yaml +++ b/tooling/scripts/test-categories.yaml @@ -83,6 +83,7 @@ messaging-networking1: - amqp - kafka - messaging + - nats - rabbitmq - ftp - http