This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b253e13a83ee65b9011ce1fb4dc5ec20cc577eca Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Thu Feb 25 23:17:26 2021 +0900 Add Netty source itest #1036 --- .../common/test/CamelSourceTestSupport.java | 15 +++ tests/itests-netty/pom.xml | 45 +++++++++ .../netty/source/CamelNettyPropertyFactory.java | 57 +++++++++++ .../netty/source/CamelSourceNettyITCase.java | 110 +++++++++++++++++++++ tests/pom.xml | 1 + 5 files changed, 228 insertions(+) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java index 5bb6a93..016525a 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java @@ -88,6 +88,21 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest { * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results * * @param connectorPropertyFactory A factory for connector properties + * @param topic the topic to send the messages to + * @param count the number of messages to send + * @throws Exception For test-specific exceptions + */ + public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws ExecutionException, InterruptedException { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + StringMessageConsumer consumer = new StringMessageConsumer(kafkaClient, topic, count); + + runTestBlocking(connectorPropertyFactory, consumer); + } + + /** + * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results + * + * @param connectorPropertyFactory A factory for connector properties * @param consumer A Kafka consumer consumer for the test messages * @throws Exception For test-specific exceptions */ diff --git a/tests/itests-netty/pom.xml b/tests/itests-netty/pom.xml new file mode 100644 index 0000000..1ef8735 --- /dev/null +++ b/tests/itests-netty/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-netty</artifactId> + <name>Camel-Kafka-Connector :: Tests :: Netty</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-netty</artifactId> + </dependency> + </dependencies> +</project> diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java new file mode 100644 index 0000000..d0f6132 --- /dev/null +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelNettyPropertyFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.netty.source; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +final class CamelNettyPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyPropertyFactory> { + + private CamelNettyPropertyFactory() { + } + + public CamelNettyPropertyFactory withProtocol(String value) { + return setProperty("camel.source.path.protocol", value); + } + + public CamelNettyPropertyFactory withHost(String value) { + return setProperty("camel.source.path.host", value); + } + + public CamelNettyPropertyFactory withPort(int value) { + return setProperty("camel.source.path.port", value); + } + + public CamelNettyPropertyFactory withSync(boolean value) { + return setProperty("camel.source.endpoint.sync", value); + } + + public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) { + String url = String.format("netty:%s://%s:%s", protocol, host, port); + return new EndpointUrlBuilder<>(this::withSourceUrl, url); + } + + public static CamelNettyPropertyFactory basic() { + return new CamelNettyPropertyFactory() + .withName("CamelNettySourceConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java new file mode 100644 index 0000000..6c76789 --- /dev/null +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.netty.source; + +import java.io.PrintWriter; +import java.net.Socket; +import java.util.concurrent.ExecutionException; + +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSourceNettyITCase extends CamelSourceTestSupport { + private static final int PORT = NetworkUtils.getFreePort("localhost"); + + private final int expect = 1; + private String topicName; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); + } + + @Override + protected void produceTestData() { + try { + // TODO necessary to wait for ckc netty endpoint to be up and ready + Thread.sleep(3000); + } catch (Exception ignored) { + } + sendMessage(); + } + + void sendMessage() { + try (Socket s = new Socket("localhost", PORT); + PrintWriter out = new PrintWriter(s.getOutputStream())) { + out.print("Hello CKC!"); + out.flush(); + } catch (Exception e) { + fail(e.getMessage(), e); + } + } + + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); + Object receivedObject = consumer.consumedMessages().get(0).value(); + assertEquals(expect, received, "Did not receive as many messages as expected"); + assertEquals("Hello CKC!", receivedObject, "Received message content differed"); + } + + @Test + @Timeout(30) + @Disabled("Camel-Netty-* connectors are not working #924") + public void testLaunchConnector() throws ExecutionException, InterruptedException { + CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withProtocol("tcp") + .withHost("localhost") + .withPort(PORT) + // one-way as test client doesn't receive response + .withSync(false); + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(30) + public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException { + CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withUrl("tcp", "localhost", PORT) + // one-way as test client doesn't receive response + .append("sync", "false") + .buildUrl(); + + runTestBlocking(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/pom.xml b/tests/pom.xml index 26aa43f..b1ec328 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -62,6 +62,7 @@ <module>itests-ssh</module> <module>itests-sql</module> <module>itests-cxf</module> + <module>itests-netty</module> </modules>