This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push: new b0ad46a Add test coverage for Kafka with SSL b0ad46a is described below commit b0ad46a2ad09c3192f06910ef88361aae8ad2098 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Wed Apr 21 09:03:35 2021 +0100 Add test coverage for Kafka with SSL Fixes #2484 --- integration-tests/kafka-ssl/README.adoc | 26 ++++ integration-tests/kafka-ssl/pom.xml | 130 +++++++++++++++++++ .../camel/quarkus/kafka/ssl/KafkaSslResource.java | 71 +++++++++++ .../camel/quarkus/kafka/ssl/KafkaSslRoutes.java | 28 ++++ .../camel/quarkus/kafka/ssl/KafkaSupport.java | 87 +++++++++++++ .../src/main/resources/application.properties | 18 +++ .../apache/camel/quarkus/kafka/ssl/KafkaSslIT.java | 23 ++++ .../camel/quarkus/kafka/ssl/KafkaSslTest.java | 55 ++++++++ .../quarkus/kafka/ssl/KafkaSslTestResource.java | 141 +++++++++++++++++++++ .../src/test/resources/config/kafka-keystore.p12 | Bin 0 -> 2451 bytes .../src/test/resources/config/kafka-truststore.p12 | Bin 0 -> 1010 bytes integration-tests/pom.xml | 1 + tooling/scripts/test-categories.yaml | 1 + 13 files changed, 581 insertions(+) diff --git a/integration-tests/kafka-ssl/README.adoc b/integration-tests/kafka-ssl/README.adoc new file mode 100644 index 0000000..ea7e106 --- /dev/null +++ b/integration-tests/kafka-ssl/README.adoc @@ -0,0 +1,26 @@ +== Camel Quarkus Kafka SSL integration tests + +To regenerate the SSL key and trust stores, do the following: + +[source,shell] +---- +cd src/test/resources/config +rm -f *.p12 + +export SECRET=kafkas3cret +export JKS_FILE=kafka-keystore.jks +export JKS_TRUST_FILE=kafka-truststore.jks +export CERT_FILE=localhost.crt +export PKCS_FILE=kafka-keystore.p12 +export PKCS_TRUST_FILE=kafka-truststore.p12 +export PEM_FILE_CERT=kafka-cert.pem +export PEM_FILE_KEY=kafka-key.pem + +keytool -genkey -alias kafka-test-store -keyalg RSA -keystore ${JKS_FILE} -keysize 2048 -validity 3650 -dname CN=localhost -keypass ${SECRET} -storepass ${SECRET} +keytool -export -alias kafka-test-store -file ${CERT_FILE} -keystore ${JKS_FILE} -keypass ${SECRET} -storepass ${SECRET} +keytool -importkeystore -srckeystore ${JKS_FILE} -srcstorepass ${SECRET} -destkeystore ${PKCS_FILE} -deststoretype PKCS12 -deststorepass ${SECRET} +keytool -keystore ${JKS_TRUST_FILE} -import -file ${CERT_FILE} -keypass ${SECRET} -storepass ${SECRET} -noprompt +keytool -importkeystore -srckeystore ${JKS_TRUST_FILE} -srcstorepass ${SECRET} -destkeystore ${PKCS_TRUST_FILE} -deststoretype PKCS12 -deststorepass ${SECRET} + +rm -f *.crt *.jks +---- diff --git a/integration-tests/kafka-ssl/pom.xml b/integration-tests/kafka-ssl/pom.xml new file mode 100644 index 0000000..e915676 --- /dev/null +++ b/integration-tests/kafka-ssl/pom.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-tests</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-quarkus-integration-test-kafka-ssl</artifactId> + <name>Camel Quarkus :: Integration Tests :: Kafka SSL</name> + <description>Integration tests for Camel Quarkus Kafka SSL</description> + + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jsonb</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jackson</artifactId> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-test-support</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <scope>test</scope> + </dependency> + + <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> diff --git a/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslResource.java b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslResource.java new file mode 100644 index 0000000..eb703ce --- /dev/null +++ b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslResource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import java.time.Duration; + +import javax.enterprise.context.ApplicationScoped; +import javax.json.Json; +import javax.json.JsonObject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +@Path("/test") +@ApplicationScoped +public class KafkaSslResource { + + @Path("/kafka/{topicName}") + @POST + @Produces(MediaType.APPLICATION_JSON) + public JsonObject post(@PathParam("topicName") String topicName, String message) throws Exception { + try (Producer<Integer, String> producer = KafkaSupport.createProducer()) { + RecordMetadata meta = producer.send(new ProducerRecord<>(topicName, 1, message)).get(); + + return Json.createObjectBuilder() + .add("topicName", meta.topic()) + .add("partition", meta.partition()) + .add("offset", meta.offset()) + .build(); + } + } + + @Path("/kafka/{topicName}") + @GET + @Produces(MediaType.APPLICATION_JSON) + public JsonObject get(@PathParam("topicName") String topicName) { + try (KafkaConsumer<Integer, String> consumer = KafkaSupport.createConsumer(topicName)) { + ConsumerRecord<Integer, String> record = consumer.poll(Duration.ofSeconds(60)).iterator().next(); + return Json.createObjectBuilder() + .add("topicName", record.topic()) + .add("partition", record.partition()) + .add("offset", record.offset()) + .add("key", record.key()) + .add("body", record.value()) + .build(); + } + } +} diff --git a/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslRoutes.java b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslRoutes.java new file mode 100644 index 0000000..0099173 --- /dev/null +++ b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslRoutes.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import org.apache.camel.builder.RouteBuilder; + +public class KafkaSslRoutes extends RouteBuilder { + @Override + public void configure() throws Exception { + from("kafka:inbound") + .to("log:kafka") + .to("kafka:outbound"); + } +} diff --git a/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSupport.java b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSupport.java new file mode 100644 index 0000000..f6a6c0b --- /dev/null +++ b/integration-tests/kafka-ssl/src/main/java/org/apache/camel/quarkus/kafka/ssl/KafkaSupport.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; + +public final class KafkaSupport { + + private KafkaSupport() { + } + + public static KafkaConsumer<Integer, String> createConsumer(String topicName) { + Properties props = new Properties(); + configureSSL(props); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(topicName)); + + return consumer; + } + + public static Producer<Integer, String> createProducer() { + Properties props = new Properties(); + configureSSL(props); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-consumer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private static void configureSSL(Properties props) { + setKafkaConfigFromCamelConfig(props, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "camel.component.kafka.brokers"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "camel.component.kafka.ssl-key-password"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + "camel.component.kafka.ssl-keystore-location"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + "camel.component.kafka.ssl-keystore-password"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "camel.component.kafka.ssl-keystore-type"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + "camel.component.kafka.ssl-truststore-location"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + "camel.component.kafka.ssl-truststore-password"); + setKafkaConfigFromCamelConfig(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, + "camel.component.kafka.ssl-truststore-type"); + setKafkaConfigFromCamelConfig(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + "camel.component.kafka.security-protocol"); + } + + private static void setKafkaConfigFromCamelConfig(Properties props, String kafkaKey, String camelKey) { + Config config = ConfigProvider.getConfig(); + props.put(kafkaKey, config.getValue(camelKey, String.class)); + } +} diff --git a/integration-tests/kafka-ssl/src/main/resources/application.properties b/integration-tests/kafka-ssl/src/main/resources/application.properties new file mode 100644 index 0000000..8136208 --- /dev/null +++ b/integration-tests/kafka-ssl/src/main/resources/application.properties @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +quarkus.ssl.native=true diff --git a/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslIT.java b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslIT.java new file mode 100644 index 0000000..839cf6d --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslIT.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import io.quarkus.test.junit.NativeImageTest; + +@NativeImageTest +public class KafkaSslIT extends KafkaSslTest { +} diff --git a/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTest.java b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTest.java new file mode 100644 index 0000000..36a020a --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import java.util.UUID; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; +import io.restassured.path.json.JsonPath; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +@QuarkusTest +@QuarkusTestResource(KafkaSslTestResource.class) +public class KafkaSslTest { + + @Test + void testKafkaBridge() { + String body = UUID.randomUUID().toString(); + + RestAssured.given() + .contentType("text/plain") + .body(body) + .post("/test/kafka/inbound") + .then() + .statusCode(200); + + JsonPath result = RestAssured.given() + .get("/test/kafka/outbound") + .then() + .statusCode(200) + .extract() + .body() + .jsonPath(); + + assertThat(result.getString("topicName")).isEqualTo("outbound"); + assertThat(result.getString("body")).isEqualTo(body); + } +} diff --git a/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTestResource.java b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTestResource.java new file mode 100644 index 0000000..d5c20a1 --- /dev/null +++ b/integration-tests/kafka-ssl/src/test/java/org/apache/camel/quarkus/kafka/ssl/KafkaSslTestResource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.ssl; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import org.apache.camel.util.CollectionHelper; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +public class KafkaSslTestResource implements QuarkusTestResourceLifecycleManager { + + private static final String KAFKA_KEYSTORE_FILE = "kafka-keystore.p12"; + private static final String KAFKA_KEYSTORE_PASSWORD = "kafkas3cret"; + private static final String KAFKA_KEYSTORE_TYPE = "PKCS12"; + private static final String KAFKA_SSL_CREDS_FILE = "broker-creds"; + private static final String KAFKA_TRUSTSTORE_FILE = "kafka-truststore.p12"; + private static final File TMP_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "kafka").toFile(); + private SSLKafkaContainer container; + + @Override + public Map<String, String> start() { + // Set up the SSL key / trust store directory + try { + TMP_DIR.mkdirs(); + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + URL resource = classLoader.getResource("config"); + File serviceBindings = new File(resource.getPath()); + + for (File keyStore : serviceBindings.listFiles()) { + URL serviceBindingResource = classLoader.getResource("config/" + keyStore.getName()); + FileUtils.copyInputStreamToFile(serviceBindingResource.openStream(), + Paths.get(TMP_DIR.getPath(), keyStore.getName()).toFile()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3"); + container = new SSLKafkaContainer(imageName); + container.start(); + + Path keystorePath = TMP_DIR.toPath(); + return CollectionHelper.mapOf( + "camel.component.kafka.brokers", container.getBootstrapServers(), + "camel.component.kafka.security-protocol", "SSL", + "camel.component.kafka.ssl-key-password", KAFKA_KEYSTORE_PASSWORD, + "camel.component.kafka.ssl-keystore-location", keystorePath.resolve(KAFKA_KEYSTORE_FILE).toString(), + "camel.component.kafka.ssl-keystore-password", KAFKA_KEYSTORE_PASSWORD, + "camel.component.kafka.ssl-keystore-type", KAFKA_KEYSTORE_TYPE, + "camel.component.kafka.ssl-truststore-location", keystorePath.resolve(KAFKA_TRUSTSTORE_FILE).toString(), + "camel.component.kafka.ssl-truststore-password", KAFKA_KEYSTORE_PASSWORD, + "camel.component.kafka.ssl-truststore-type", KAFKA_KEYSTORE_TYPE); + } + + @Override + public void stop() { + if (this.container != null) { + try { + this.container.stop(); + FileUtils.deleteDirectory(TMP_DIR); + } catch (Exception e) { + // Ignored + } + } + } + + // KafkaContainer does not support SSL OOTB so we need some customizations + static final class SSLKafkaContainer extends KafkaContainer { + + SSLKafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + + String protocolMap = "SSL:SSL,BROKER:PLAINTEXT"; + String listeners = "SSL://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"; + + withEnv("KAFKA_LISTENERS", listeners); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", protocolMap); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + withEnv("KAFKA_SSL_KEY_CREDENTIALS", KAFKA_SSL_CREDS_FILE); + withEnv("KAFKA_SSL_KEYSTORE_FILENAME", KAFKA_KEYSTORE_FILE); + withEnv("KAFKA_SSL_KEYSTORE_CREDENTIALS", KAFKA_SSL_CREDS_FILE); + withEnv("KAFKA_SSL_KEYSTORE_TYPE", KAFKA_KEYSTORE_TYPE); + withEnv("KAFKA_SSL_TRUSTSTORE_FILENAME", KAFKA_TRUSTSTORE_FILE); + withEnv("KAFKA_SSL_TRUSTSTORE_CREDENTIALS", KAFKA_SSL_CREDS_FILE); + withEnv("KAFKA_SSL_TRUSTSTORE_TYPE", KAFKA_KEYSTORE_TYPE); + withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", ""); + withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false"); + withEmbeddedZookeeper().waitingFor(Wait.forListeningPort()); + withLogConsumer(frame -> System.out.print(frame.getUtf8String())); + } + + @Override + public String getBootstrapServers() { + return String.format("SSL://%s:%s", getHost(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + copyFileToContainer( + MountableFile.forClasspathResource("config/" + KAFKA_KEYSTORE_FILE), + "/etc/kafka/secrets/" + KAFKA_KEYSTORE_FILE); + + copyFileToContainer( + MountableFile.forClasspathResource("config/" + KAFKA_TRUSTSTORE_FILE), + "/etc/kafka/secrets/" + KAFKA_TRUSTSTORE_FILE); + + copyFileToContainer( + Transferable.of(KAFKA_KEYSTORE_PASSWORD.getBytes(StandardCharsets.UTF_8)), + "/etc/kafka/secrets/" + KAFKA_SSL_CREDS_FILE); + } + } +} diff --git a/integration-tests/kafka-ssl/src/test/resources/config/kafka-keystore.p12 b/integration-tests/kafka-ssl/src/test/resources/config/kafka-keystore.p12 new file mode 100644 index 0000000..2585d2e Binary files /dev/null and b/integration-tests/kafka-ssl/src/test/resources/config/kafka-keystore.p12 differ diff --git a/integration-tests/kafka-ssl/src/test/resources/config/kafka-truststore.p12 b/integration-tests/kafka-ssl/src/test/resources/config/kafka-truststore.p12 new file mode 100644 index 0000000..c124e21 Binary files /dev/null and b/integration-tests/kafka-ssl/src/test/resources/config/kafka-truststore.p12 differ diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 1784c79..32ef973 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -136,6 +136,7 @@ <module>jta</module> <module>kafka</module> <module>kafka-sasl</module> + <module>kafka-ssl</module> <module>kamelet</module> <module>kotlin</module> <module>kubernetes</module> diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml index 9bc375b..d19bd9a 100644 --- a/tooling/scripts/test-categories.yaml +++ b/tooling/scripts/test-categories.yaml @@ -127,6 +127,7 @@ group-09: - amqp - kafka - kafka-sasl + - kafka-ssl - messaging - nats - splunk