This is an automated email from the ASF dual-hosted git repository. tsato pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 4d52cb141bb3508a68fc54a2fbecf8f8c4d8b7d7 Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Thu Mar 11 13:01:11 2021 +0900 Add https itests --- .../common/BasicConnectorPropertyFactory.java | 4 + .../common/ConnectorPropertyFactory.java | 2 +- .../common/SinkConnectorPropertyFactory.java | 7 + tests/itests-https/pom.xml | 57 +++++++ .../https/sink/CamelHTTPSPropertyFactory.java | 68 ++++++++ .../https/sink/CamelSinkHTTPSITCase.java | 179 +++++++++++++++++++++ .../src/test/resources/client-truststore.jks | Bin 0 -> 844 bytes .../src/test/resources/server-keystore.jks | Bin 0 -> 2134 bytes tests/pom.xml | 1 + 9 files changed, 317 insertions(+), 1 deletion(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java index 0e98490..20c3558 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/BasicConnectorPropertyFactory.java @@ -100,6 +100,10 @@ public abstract class BasicConnectorPropertyFactory<T extends BasicConnectorProp return "#class:" + className; } + public static String classRef(Class<?> clazz) { + return "#class:" + clazz.getName(); + } + public T merge(Properties properties) { Set<Map.Entry<Object, Object>> set = properties.entrySet(); connectorProps.putAll(set.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b)->b))); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java index 8ce678d..a89695a 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/ConnectorPropertyFactory.java @@ -41,6 +41,6 @@ public interface ConnectorPropertyFactory { Logger log = LoggerFactory.getLogger(ConnectorPropertyFactory.class); log.info("Using the following properties for the test: "); - properties.entrySet().forEach(entry -> log.info("{}={}", entry.getKey(), entry.getValue())); + properties.forEach((key, value) -> log.info("{}={}", key, value)); } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java index 356ee0d..d8c70f5 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java @@ -17,7 +17,10 @@ package org.apache.camel.kafkaconnector.common; +import org.apache.camel.LoggingLevel; + import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF; +import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF; public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { @@ -28,4 +31,8 @@ public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorProper public T withSinkUrl(String sinkUrl) { return setProperty(CAMEL_SINK_URL_CONF, sinkUrl); } + + public T withSinkContentLogginglevel(LoggingLevel level) { + return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString()); + } } diff --git a/tests/itests-https/pom.xml b/tests/itests-https/pom.xml new file mode 100644 index 0000000..c550e83 --- /dev/null +++ b/tests/itests-https/pom.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-https</artifactId> + <name>Camel-Kafka-Connector :: Tests :: HTTPS</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-http</artifactId> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java new file mode 100644 index 0000000..31993ca --- /dev/null +++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelHTTPSPropertyFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.https.sink; + +import org.apache.camel.LoggingLevel; +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.support.jsse.KeyStoreParameters; +import org.apache.camel.support.jsse.SSLContextParameters; +import org.apache.camel.support.jsse.TrustManagersParameters; + +final class CamelHTTPSPropertyFactory extends SinkConnectorPropertyFactory<CamelHTTPSPropertyFactory> { + private CamelHTTPSPropertyFactory() { + } + + public CamelHTTPSPropertyFactory withHttpUri(String uri) { + return setProperty("camel.sink.path.httpUri", uri); + } + + public CamelHTTPSPropertyFactory withSslContextParameters(String bean, String keyStore, String password) { + withBeans("ksp", classRef(KeyStoreParameters.class)); + withBeans("ksp.resource", keyStore); + withBeans("ksp.password", password); + + withBeans("tmp", classRef(TrustManagersParameters.class)); + withBeans("tmp.keyStore", "#bean:ksp"); + + withBeans(bean, classRef(SSLContextParameters.class)); + withBeans(bean + ".trustManagers", "#bean:tmp"); + + return setProperty("camel.sink.endpoint.sslContextParameters", "#bean:" + bean); + } + + public CamelHTTPSPropertyFactory withX509HostnameVerifier(String bean, Class<?> verifierClass) { + withBeans(bean, classRef(verifierClass)); + return setProperty("camel.sink.endpoint.x509HostnameVerifier", "#bean:" + bean); + } + + public EndpointUrlBuilder<CamelHTTPSPropertyFactory> withUrl(String host, int port, String path) { + String url = String.format("https://%s:%s/%s", host, port, path); + return new EndpointUrlBuilder<>(this::withSinkUrl, url); + } + + public static CamelHTTPSPropertyFactory basic() { + return new CamelHTTPSPropertyFactory() + .withName("CamelHttpsSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.https.CamelHttpsSinkConnector") + .withTasksMax(1) + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withSinkContentLogginglevel(LoggingLevel.DEBUG); + } +} diff --git a/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java new file mode 100644 index 0000000..e5d6672 --- /dev/null +++ b/tests/itests-https/src/test/java/org/apache/camel/kafkaconnector/https/sink/CamelSinkHTTPSITCase.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.https.sink; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.security.KeyStore; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkHTTPSITCase extends CamelSinkTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPSITCase.class); + + private final String host = NetworkUtils.getHostname(); + private final int port = NetworkUtils.getFreePort(host); + + private MockWebServer mockServer; + + private String topicName; + + private final int expect = 10; + private List<RecordedRequest> received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-https-kafka-connector"}; + } + + @BeforeEach + public void setUp() throws Exception { + topicName = getTopicForTest(this); + + setupHttpsMockServer(); + received = Collections.emptyList(); + } + + private void setupHttpsMockServer() throws Exception { + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load(getClass().getResourceAsStream("/server-keystore.jks"), "secret".toCharArray()); + KeyManagerFactory kmFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmFactory.init(keyStore, "secret".toCharArray()); + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(kmFactory.getKeyManagers(), null, null); + mockServer = new MockWebServer(); + mockServer.useHttps(sslContext.getSocketFactory(), false); + } + + private void startMockServer() throws IOException { + IntStream.range(0, expect).forEach(i -> { + mockServer.enqueue(new MockResponse().setResponseCode(200)); + }); + mockServer.start(InetAddress.getByName(host), port); + } + + @AfterEach + public void tearDown() throws Exception { + if (mockServer != null) { + mockServer.shutdown(); + } + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + received = IntStream.range(0, expect).mapToObj(i -> { + try { + return mockServer.takeRequest(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Unable to receive messages: {}", e.getMessage(), e); + return null; + } + }).collect(Collectors.toList()); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + String expected = "Sink test message "; + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent"); + + + for (RecordedRequest request : received) { + String actual = request.getBody().readUtf8(); + LOG.debug("Received: {} ", actual); + + assertEquals("/ckc", request.getPath(), "Received path differed"); + assertTrue(actual.startsWith(expected), "Received message content differed"); + } + + assertEquals(expect, received.size(), "Did not receive the same amount of messages that were sent"); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + @Timeout(60) + public void testBasicSendReceive() throws Exception { + startMockServer(); + + String uri = mockServer.getHostName() + ":" + mockServer.getPort() + "/ckc"; + ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic() + .withTopics(topicName) + .withHttpUri(uri) + .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret") + // let's skip host verification as hostname may vary depending on test env + .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(60) + public void testBasicSendReceiveUsingUrl() throws Exception { + startMockServer(); + + ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPSPropertyFactory.basic() + .withTopics(topicName) + .withSslContextParameters("scp", toPath("client-truststore.jks"), "secret") + // let's skip host verification as hostname may vary depending on test env + .withX509HostnameVerifier("x509HostnameVerifier", NoopHostnameVerifier.class) + .withUrl(mockServer.getHostName(), mockServer.getPort(), "ckc") + .append("sslContextParameters", "#bean:scp") + .append("x509HostnameVerifier", "#bean:x509HostnameVerifier") + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + } + + private String toPath(String resource) { + URL url = Objects.requireNonNull(getClass().getClassLoader().getResource(resource)); + return url.getPath(); + } +} diff --git a/tests/itests-https/src/test/resources/client-truststore.jks b/tests/itests-https/src/test/resources/client-truststore.jks new file mode 100644 index 0000000..d4c5d58 Binary files /dev/null and b/tests/itests-https/src/test/resources/client-truststore.jks differ diff --git a/tests/itests-https/src/test/resources/server-keystore.jks b/tests/itests-https/src/test/resources/server-keystore.jks new file mode 100644 index 0000000..a230624 Binary files /dev/null and b/tests/itests-https/src/test/resources/server-keystore.jks differ diff --git a/tests/pom.xml b/tests/pom.xml index 0c3ff80..cff0878 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -49,6 +49,7 @@ <module>itests-syslog</module> <module>itests-file</module> <module>itests-http</module> + <module>itests-https</module> <module>itests-timer</module> <module>itests-slack</module> <module>itests-salesforce</module>