This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 003a1b8  Improve Kafka test coverage Fixes #2627
003a1b8 is described below

commit 003a1b87c640ad0b42c5504fd97b59ae741c3964
Author: Zineb Bendhiba <bendhiba.zi...@gmail.com>
AuthorDate: Wed Sep 8 15:43:39 2021 +0200

    Improve Kafka test coverage
    Fixes #2627
---
 integration-tests/kafka/pom.xml                    |  22 +++++
 .../component/kafka/CamelKafkaResource.java        |  72 ++++++++++++++
 .../quarkus/component/kafka/CamelKafkaRoutes.java  |  48 +++++++++
 .../component/kafka/CounterRoutePolicy.java        |  36 +++++++
 .../component/kafka/CustomHeaderDeserializer.java  |  39 ++++++++
 .../component/kafka/model/KafkaMessage.java        |  49 ++++++++++
 .../camel/quarkus/component/kafka/model/Price.java |  46 +++++++++
 .../quarkus/component/kafka/it/CamelKafkaTest.java | 108 +++++++++++++++++++++
 8 files changed, 420 insertions(+)

diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 5a49da4..0940faa 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -53,6 +53,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-seda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
             
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
         </dependency>
         <dependency>
@@ -75,6 +79,11 @@
             <artifactId>kafka</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built 
after them. You can update them by running `mvn process-resources -Pformat -N` 
from the source tree root directory -->
         <dependency>
@@ -129,6 +138,19 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-seda-deployment</artifactId>
+            <version>${project.version}</version>
+            <type>pom</type>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
index ed46943..752bcca 100644
--- 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.quarkus.component.kafka;
 
+import java.math.BigInteger;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -39,16 +40,22 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.kafka.KafkaClientFactory;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.quarkus.component.kafka.model.KafkaMessage;
+import org.apache.camel.quarkus.component.kafka.model.Price;
+import org.apache.camel.spi.RouteController;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeader;
 
 @Path("/kafka")
 @ApplicationScoped
@@ -68,6 +75,9 @@ public class CamelKafkaResource {
     @Inject
     ProducerTemplate producerTemplate;
 
+    @Inject
+    ConsumerTemplate consumerTemplate;
+
     @Path("/custom/client/factory/missing")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
@@ -127,4 +137,66 @@ public class CamelKafkaResource {
                 .map(m -> m.getBody(String.class))
                 .collect(Collectors.toList());
     }
+
+    @Path("/foo/{action}")
+    @POST
+    public Response modifyFooConsumerState(@PathParam("action") String action) 
throws Exception {
+        RouteController controller = context.getRouteController();
+        if (action.equals("start")) {
+            controller.startRoute("foo");
+        } else if (action.equals("stop")) {
+            controller.stopRoute("foo");
+        } else {
+            throw new IllegalArgumentException("Unknown action: " + action);
+        }
+        return Response.ok().build();
+    }
+
+    @Path("/seda/{queue}")
+    @GET
+    public String getSedaMessage(@PathParam("queue") String queueName) {
+        return consumerTemplate.receiveBody(String.format("seda:%s", 
queueName), 10000, String.class);
+    }
+
+    @Path("price/{key}")
+    @POST
+    public Response postPrice(@PathParam("key") Integer key, Double price) {
+        String routeURI = 
"kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer"
+                +
+                
"&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer";
+        producerTemplate.sendBodyAndHeader(routeURI, price, 
KafkaConstants.KEY, key);
+        return Response.ok().build();
+    }
+
+    @Path("price")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public Price getPrice() {
+        Exchange exchange = consumerTemplate.receive("seda:serializer", 10000);
+        Integer key = exchange.getMessage().getHeader(KafkaConstants.KEY, 
Integer.class);
+        Double price = exchange.getMessage().getBody(Double.class);
+        return new Price(key, price);
+    }
+
+    @Path("propagate/{id}")
+    @POST
+    public Response postMessageWithHeader(@PathParam("id") Integer id, String 
message) {
+        try (Producer<Integer, String> producer = new 
KafkaProducer<>(producerProperties)) {
+            ProducerRecord data = new ProducerRecord<>("test-propagation", id, 
message);
+            data.headers().add(new RecordHeader("id", 
BigInteger.valueOf(id).toByteArray()));
+            producer.send(data);
+        }
+        return Response.ok().build();
+    }
+
+    @Path("propagate")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public KafkaMessage getKafkaMessage() {
+        Exchange exchange = consumerTemplate.receive("seda:propagation", 
10000);
+        String id = exchange.getMessage().getHeader("id", String.class);
+        String message = exchange.getMessage().getBody(String.class);
+        return new KafkaMessage(id, message);
+    }
+
 }
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
index 65e752e..85b2dfa 100644
--- 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
@@ -21,10 +21,21 @@ import javax.enterprise.inject.Produces;
 import javax.inject.Named;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaManualCommit;
 import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 
 public class CamelKafkaRoutes extends RouteBuilder {
+
+    private final static String KAFKA_CONSUMER_MANUAL_COMMIT = 
"kafka:manual-commit-topic"
+            + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+            + "&allowManualCommit=true&autoOffsetReset=earliest";
+
+    private final static String SEDA_FOO = "seda:foo";
+    private final static String SEDA_SERIALIZER = "seda:serializer";
+    private final static String SEDA_HEADER_PROPAGATION = "seda:propagation";
+
     @ConfigProperty(name = "camel.component.kafka.brokers")
     String brokers;
 
@@ -35,6 +46,13 @@ public class CamelKafkaRoutes extends RouteBuilder {
         return new KafkaIdempotentRepository("idempotent-topic", brokers);
     }
 
+    @Produces
+    @ApplicationScoped
+    @Named("customHeaderDeserializer")
+    CustomHeaderDeserializer customHeaderDeserializer() {
+        return new CustomHeaderDeserializer();
+    }
+
     @Override
     public void configure() throws Exception {
         from("kafka:inbound?autoOffsetReset=earliest")
@@ -46,5 +64,35 @@ public class CamelKafkaRoutes extends RouteBuilder {
                 .messageIdRepositoryRef("kafkaIdempotentRepository")
                 .to("mock:idempotent-results")
                 .end();
+
+        CounterRoutePolicy counterRoutePolicy = new CounterRoutePolicy();
+
+        // Kafka consumer that use Manual commit
+        // it manually commits only once every 2 messages received, so that we 
could test redelivery of uncommitted messages
+        from(KAFKA_CONSUMER_MANUAL_COMMIT)
+                .routeId("foo")
+                .routePolicy(counterRoutePolicy)
+                .to(SEDA_FOO)
+                .process(e -> {
+                    int counter = counterRoutePolicy.getCounter();
+                    if (counter % 2 != 0) {
+                        KafkaManualCommit manual = 
e.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
+                                KafkaManualCommit.class);
+                        manual.commitSync();
+                    }
+                });
+
+        // By default, keyDeserializer & valueDeserializer == 
org.apache.kafka.common.serialization.StringDeserializer
+        // and valueSerializer & keySerializer == 
org.apache.kafka.common.serialization.StringSerializer
+        // the idea here is to test setting different kinds of Deserializers
+        from("kafka:test-serializer?autoOffsetReset=earliest" +
+                
"&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" +
+                
"&valueDeserializer=org.apache.kafka.common.serialization.DoubleDeserializer")
+                        .to(SEDA_SERIALIZER);
+
+        // Header Propagation using CustomHeaderDeserialize
+        
from("kafka:test-propagation?headerDeserializer=#customHeaderDeserializer")
+                .to(SEDA_HEADER_PROPAGATION);
+
     }
 }
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
new file mode 100644
index 0000000..3d37b65
--- /dev/null
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.support.RoutePolicySupport;
+
+public class CounterRoutePolicy extends RoutePolicySupport {
+    private AtomicInteger counter = new AtomicInteger();
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange) {
+        exchange.setProperty("counter", counter.incrementAndGet());
+    }
+
+    public int getCounter() {
+        return counter.get();
+    }
+}
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
new file mode 100644
index 0000000..21993f4
--- /dev/null
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.math.BigInteger;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+
+@RegisterForReflection
+public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+
+    public CustomHeaderDeserializer() {
+    }
+
+    @Override
+    public Object deserialize(String key, byte[] value) {
+        if (key.equals("id")) {
+            BigInteger bi = new BigInteger(value);
+            return String.valueOf(bi.longValue());
+        } else {
+            return super.deserialize(key, value);
+        }
+    }
+}
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
new file mode 100644
index 0000000..78faebe
--- /dev/null
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class KafkaMessage {
+    String id;
+    String message;
+
+    public KafkaMessage() {
+    }
+
+    public KafkaMessage(String id, String message) {
+        this.id = id;
+        this.message = message;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+}
diff --git 
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
new file mode 100644
index 0000000..9e9a5be
--- /dev/null
+++ 
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class Price {
+    Integer key;
+    Double price;
+
+    public Price(Integer key, Double price) {
+        this.key = key;
+        this.price = price;
+    }
+
+    public Integer getKey() {
+        return key;
+    }
+
+    public void setKey(Integer key) {
+        this.key = key;
+    }
+
+    public Double getPrice() {
+        return price;
+    }
+
+    public void setPrice(Double price) {
+        this.price = price;
+    }
+}
diff --git 
a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
 
b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
index 6e0fdaa..627e5c5 100644
--- 
a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
+++ 
b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.quarkus.component.kafka.it;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
@@ -25,10 +26,12 @@ import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 import io.restassured.path.json.JsonPath;
 import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import static io.restassured.RestAssured.given;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -86,4 +89,109 @@ public class CamelKafkaTest {
                 .statusCode(200)
                 .body(is("true"));
     }
+
+    @Test
+    void testManualCommit() {
+        String body1 = UUID.randomUUID().toString();
+
+        // test consuming first message with manual auto-commit
+        // send message that should be consumed by route with routeId = foo
+        given()
+                .contentType("text/plain")
+                .body(body1)
+                .post("/kafka/manual-commit-topic")
+                .then()
+                .statusCode(200);
+
+        // make sure the message has been consumed
+        given()
+                .contentType("text/plain")
+                .body(body1)
+                .get("/kafka/seda/foo")
+                .then()
+                .body(equalTo(body1));
+
+        String body2 = UUID.randomUUID().toString();
+
+        given()
+                .contentType("text/plain")
+                .body(body2)
+                .post("/kafka/manual-commit-topic")
+                .then()
+                .statusCode(200);
+
+        Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(30, 
TimeUnit.SECONDS).until(() -> {
+
+            // make sure the message has been consumed
+            String result = given()
+                    .contentType("text/plain")
+                    .get("/kafka/seda/foo")
+                    .asString();
+            return body2.equals(result);
+        });
+
+        // stop foo route
+        given()
+                .contentType("text/plain")
+                .body(body1)
+                .post("/kafka/foo/stop")
+                .then()
+                .statusCode(200);
+
+        // start again the foo route
+        given()
+                .contentType("text/plain")
+                .body(body1)
+                .post("/kafka/foo/start")
+                .then()
+                .statusCode(200);
+
+        // Make sure the second message is redelivered
+        Awaitility.await().pollInterval(100, TimeUnit.MILLISECONDS).atMost(30, 
TimeUnit.SECONDS).until(() -> {
+
+            // make sure the message has been consumed
+            String result = given()
+                    .contentType("text/plain")
+                    .get("/kafka/seda/foo")
+                    .asString();
+            return body2.equals(result);
+        });
+    }
+
+    @Test
+    void testSerializers() {
+        given()
+                .contentType("text/json")
+                .body(95.59F)
+                .post("/kafka/price/1")
+                .then()
+                .statusCode(200);
+
+        // make sure the message has been consumed
+        given()
+                .contentType("text/json")
+                .get("/kafka/price")
+                .then()
+                .body("key", equalTo(1))
+                .body("price", equalTo(95.59F));
+    }
+
+    @Test
+    void testHeaderPropagation() throws InterruptedException {
+        given()
+                .contentType(ContentType.JSON)
+                .accept(ContentType.JSON)
+                .body("hello world")
+                .post("/kafka/propagate/5")
+                .then()
+                .statusCode(200);
+
+        // make sure the message has been consumed, and that the id put in the 
header has been propagated
+        given()
+                .contentType("text/json")
+                .get("/kafka/propagate")
+                .then()
+                .body("id", equalTo("5"))
+                .body("message", equalTo("hello world"));
+    }
 }

Reply via email to