This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push: new 003a1b8 Improve Kafka test coverage Fixes #2627 003a1b8 is described below commit 003a1b87c640ad0b42c5504fd97b59ae741c3964 Author: Zineb Bendhiba <bendhiba.zi...@gmail.com> AuthorDate: Wed Sep 8 15:43:39 2021 +0200 Improve Kafka test coverage Fixes #2627 --- integration-tests/kafka/pom.xml | 22 +++++ .../component/kafka/CamelKafkaResource.java | 72 ++++++++++++++ .../quarkus/component/kafka/CamelKafkaRoutes.java | 48 +++++++++ .../component/kafka/CounterRoutePolicy.java | 36 +++++++ .../component/kafka/CustomHeaderDeserializer.java | 39 ++++++++ .../component/kafka/model/KafkaMessage.java | 49 ++++++++++ .../camel/quarkus/component/kafka/model/Price.java | 46 +++++++++ .../quarkus/component/kafka/it/CamelKafkaTest.java | 108 +++++++++++++++++++++ 8 files changed, 420 insertions(+) diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index 5a49da4..0940faa 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -53,6 +53,10 @@ </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-integration-tests-support-kafka</artifactId> </dependency> <dependency> @@ -75,6 +79,11 @@ <artifactId>kafka</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> <dependency> @@ -129,6 +138,19 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-seda-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <profiles> diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java index ed46943..752bcca 100644 --- a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java @@ -16,6 +16,7 @@ */ package org.apache.camel.quarkus.component.kafka; +import java.math.BigInteger; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -39,16 +40,22 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.camel.CamelContext; +import org.apache.camel.ConsumerTemplate; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.kafka.KafkaClientFactory; +import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.quarkus.component.kafka.model.KafkaMessage; +import org.apache.camel.quarkus.component.kafka.model.Price; +import org.apache.camel.spi.RouteController; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.internals.RecordHeader; @Path("/kafka") @ApplicationScoped @@ -68,6 +75,9 @@ public class CamelKafkaResource { @Inject ProducerTemplate producerTemplate; + @Inject + ConsumerTemplate consumerTemplate; + @Path("/custom/client/factory/missing") @GET @Produces(MediaType.TEXT_PLAIN) @@ -127,4 +137,66 @@ public class CamelKafkaResource { .map(m -> m.getBody(String.class)) .collect(Collectors.toList()); } + + @Path("/foo/{action}") + @POST + public Response modifyFooConsumerState(@PathParam("action") String action) throws Exception { + RouteController controller = context.getRouteController(); + if (action.equals("start")) { + controller.startRoute("foo"); + } else if (action.equals("stop")) { + controller.stopRoute("foo"); + } else { + throw new IllegalArgumentException("Unknown action: " + action); + } + return Response.ok().build(); + } + + @Path("/seda/{queue}") + @GET + public String getSedaMessage(@PathParam("queue") String queueName) { + return consumerTemplate.receiveBody(String.format("seda:%s", queueName), 10000, String.class); + } + + @Path("price/{key}") + @POST + public Response postPrice(@PathParam("key") Integer key, Double price) { + String routeURI = "kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer" + + + "&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer"; + producerTemplate.sendBodyAndHeader(routeURI, price, KafkaConstants.KEY, key); + return Response.ok().build(); + } + + @Path("price") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Price getPrice() { + Exchange exchange = consumerTemplate.receive("seda:serializer", 10000); + Integer key = exchange.getMessage().getHeader(KafkaConstants.KEY, Integer.class); + Double price = exchange.getMessage().getBody(Double.class); + return new Price(key, price); + } + + @Path("propagate/{id}") + @POST + public Response postMessageWithHeader(@PathParam("id") Integer id, String message) { + try (Producer<Integer, String> producer = new KafkaProducer<>(producerProperties)) { + ProducerRecord data = new ProducerRecord<>("test-propagation", id, message); + data.headers().add(new RecordHeader("id", BigInteger.valueOf(id).toByteArray())); + producer.send(data); + } + return Response.ok().build(); + } + + @Path("propagate") + @GET + @Produces(MediaType.APPLICATION_JSON) + public KafkaMessage getKafkaMessage() { + Exchange exchange = consumerTemplate.receive("seda:propagation", 10000); + String id = exchange.getMessage().getHeader("id", String.class); + String message = exchange.getMessage().getBody(String.class); + return new KafkaMessage(id, message); + } + } diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java index 65e752e..85b2dfa 100644 --- a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java @@ -21,10 +21,21 @@ import javax.enterprise.inject.Produces; import javax.inject.Named; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.component.kafka.KafkaManualCommit; import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.eclipse.microprofile.config.inject.ConfigProperty; public class CamelKafkaRoutes extends RouteBuilder { + + private final static String KAFKA_CONSUMER_MANUAL_COMMIT = "kafka:manual-commit-topic" + + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" + + "&allowManualCommit=true&autoOffsetReset=earliest"; + + private final static String SEDA_FOO = "seda:foo"; + private final static String SEDA_SERIALIZER = "seda:serializer"; + private final static String SEDA_HEADER_PROPAGATION = "seda:propagation"; + @ConfigProperty(name = "camel.component.kafka.brokers") String brokers; @@ -35,6 +46,13 @@ public class CamelKafkaRoutes extends RouteBuilder { return new KafkaIdempotentRepository("idempotent-topic", brokers); } + @Produces + @ApplicationScoped + @Named("customHeaderDeserializer") + CustomHeaderDeserializer customHeaderDeserializer() { + return new CustomHeaderDeserializer(); + } + @Override public void configure() throws Exception { from("kafka:inbound?autoOffsetReset=earliest") @@ -46,5 +64,35 @@ public class CamelKafkaRoutes extends RouteBuilder { .messageIdRepositoryRef("kafkaIdempotentRepository") .to("mock:idempotent-results") .end(); + + CounterRoutePolicy counterRoutePolicy = new CounterRoutePolicy(); + + // Kafka consumer that use Manual commit + // it manually commits only once every 2 messages received, so that we could test redelivery of uncommitted messages + from(KAFKA_CONSUMER_MANUAL_COMMIT) + .routeId("foo") + .routePolicy(counterRoutePolicy) + .to(SEDA_FOO) + .process(e -> { + int counter = counterRoutePolicy.getCounter(); + if (counter % 2 != 0) { + KafkaManualCommit manual = e.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, + KafkaManualCommit.class); + manual.commitSync(); + } + }); + + // By default, keyDeserializer & valueDeserializer == org.apache.kafka.common.serialization.StringDeserializer + // and valueSerializer & keySerializer == org.apache.kafka.common.serialization.StringSerializer + // the idea here is to test setting different kinds of Deserializers + from("kafka:test-serializer?autoOffsetReset=earliest" + + "&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" + + "&valueDeserializer=org.apache.kafka.common.serialization.DoubleDeserializer") + .to(SEDA_SERIALIZER); + + // Header Propagation using CustomHeaderDeserialize + from("kafka:test-propagation?headerDeserializer=#customHeaderDeserializer") + .to(SEDA_HEADER_PROPAGATION); + } } diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java new file mode 100644 index 0000000..3d37b65 --- /dev/null +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.Exchange; +import org.apache.camel.Route; +import org.apache.camel.support.RoutePolicySupport; + +public class CounterRoutePolicy extends RoutePolicySupport { + private AtomicInteger counter = new AtomicInteger(); + + @Override + public void onExchangeBegin(Route route, Exchange exchange) { + exchange.setProperty("counter", counter.incrementAndGet()); + } + + public int getCounter() { + return counter.get(); + } +} diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java new file mode 100644 index 0000000..21993f4 --- /dev/null +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka; + +import java.math.BigInteger; + +import io.quarkus.runtime.annotations.RegisterForReflection; +import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; + +@RegisterForReflection +public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { + + public CustomHeaderDeserializer() { + } + + @Override + public Object deserialize(String key, byte[] value) { + if (key.equals("id")) { + BigInteger bi = new BigInteger(value); + return String.valueOf(bi.longValue()); + } else { + return super.deserialize(key, value); + } + } +} diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java new file mode 100644 index 0000000..78faebe --- /dev/null +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class KafkaMessage { + String id; + String message; + + public KafkaMessage() { + } + + public KafkaMessage(String id, String message) { + this.id = id; + this.message = message; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java new file mode 100644 index 0000000..9e9a5be --- /dev/null +++ b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Price { + Integer key; + Double price; + + public Price(Integer key, Double price) { + this.key = key; + this.price = price; + } + + public Integer getKey() { + return key; + } + + public void setKey(Integer key) { + this.key = key; + } + + public Double getPrice() { + return price; + } + + public void setPrice(Double price) { + this.price = price; + } +} diff --git a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java index 6e0fdaa..627e5c5 100644 --- a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java +++ b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java @@ -18,6 +18,7 @@ package org.apache.camel.quarkus.component.kafka.it; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; @@ -25,10 +26,12 @@ import io.restassured.RestAssured; import io.restassured.http.ContentType; import io.restassured.path.json.JsonPath; import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -86,4 +89,109 @@ public class CamelKafkaTest { .statusCode(200) .body(is("true")); } + + @Test + void testManualCommit() { + String body1 = UUID.randomUUID().toString(); + + // test consuming first message with manual auto-commit + // send message that should be consumed by route with routeId = foo + given() + .contentType("text/plain") + .body(body1) + .post("/kafka/manual-commit-topic") + .then() + .statusCode(200); + + // make sure the message has been consumed + given() + .contentType("text/plain") + .body(body1) + .get("/kafka/seda/foo") + .then() + .body(equalTo(body1)); + + String body2 = UUID.randomUUID().toString(); + + given() + .contentType("text/plain") + .body(body2) + .post("/kafka/manual-commit-topic") + .then() + .statusCode(200); + + Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS).until(() -> { + + // make sure the message has been consumed + String result = given() + .contentType("text/plain") + .get("/kafka/seda/foo") + .asString(); + return body2.equals(result); + }); + + // stop foo route + given() + .contentType("text/plain") + .body(body1) + .post("/kafka/foo/stop") + .then() + .statusCode(200); + + // start again the foo route + given() + .contentType("text/plain") + .body(body1) + .post("/kafka/foo/start") + .then() + .statusCode(200); + + // Make sure the second message is redelivered + Awaitility.await().pollInterval(100, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS).until(() -> { + + // make sure the message has been consumed + String result = given() + .contentType("text/plain") + .get("/kafka/seda/foo") + .asString(); + return body2.equals(result); + }); + } + + @Test + void testSerializers() { + given() + .contentType("text/json") + .body(95.59F) + .post("/kafka/price/1") + .then() + .statusCode(200); + + // make sure the message has been consumed + given() + .contentType("text/json") + .get("/kafka/price") + .then() + .body("key", equalTo(1)) + .body("price", equalTo(95.59F)); + } + + @Test + void testHeaderPropagation() throws InterruptedException { + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("hello world") + .post("/kafka/propagate/5") + .then() + .statusCode(200); + + // make sure the message has been consumed, and that the id put in the header has been propagated + given() + .contentType("text/json") + .get("/kafka/propagate") + .then() + .body("id", equalTo("5")) + .body("message", equalTo("hello world")); + } }