This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 5bc30a032b0def5490f3f7859a1062f7e681df6f Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Tue Mar 2 11:53:49 2021 +0900 Add netty sink itest #1036 --- .../netty/sink/CamelNettyPropertyFactory.java | 61 +++++++++++ .../netty/sink/CamelSinkNettyITCase.java | 119 +++++++++++++++++++++ .../netty/source/CamelSourceNettyITCase.java | 5 +- 3 files changed, 182 insertions(+), 3 deletions(-) diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java new file mode 100644 index 0000000..31b5343 --- /dev/null +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelNettyPropertyFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.netty.sink; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +final class CamelNettyPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyPropertyFactory> { + + private CamelNettyPropertyFactory() { + } + + public CamelNettyPropertyFactory withProtocol(String value) { + return setProperty("camel.sink.path.protocol", value); + } + + public CamelNettyPropertyFactory withHost(String value) { + return setProperty("camel.sink.path.host", value); + } + + public CamelNettyPropertyFactory withPort(int value) { + return setProperty("camel.sink.path.port", value); + } + + public CamelNettyPropertyFactory withDisconnect(boolean value) { + return setProperty("camel.sink.endpoint.disconnect", value); + } + + public CamelNettyPropertyFactory withSync(boolean value) { + return setProperty("camel.sink.endpoint.sync", value); + } + + public EndpointUrlBuilder<CamelNettyPropertyFactory> withUrl(String protocol, String host, int port) { + String url = String.format("netty:%s://%s:%s", protocol, host, port); + return new EndpointUrlBuilder<>(this::withSinkUrl, url); + } + + public static CamelNettyPropertyFactory basic() { + return new CamelNettyPropertyFactory() + .withName("CamelNettySinkConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.netty.CamelNettySinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java new file mode 100644 index 0000000..bb08243 --- /dev/null +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/sink/CamelSinkNettyITCase.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.netty.sink; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CamelSinkNettyITCase extends CamelSinkTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyITCase.class); + private static final int PORT = NetworkUtils.getFreePort("localhost"); + + private String topicName; + + private final int expect = 1; + private volatile String received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); + received = null; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try (ServerSocket serverSocket = new ServerSocket(PORT); + Socket socket = serverSocket.accept(); + InputStream is = socket.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(is))) { + received = reader.readLine(); + LOG.debug("Received: {}", received); + } catch (IOException e) { + LOG.error("Unable to receive messages: {}", e.getMessage(), e); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + String expected = "Sink test message 0"; + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals(expected, received, "Received message content differed"); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + @Timeout(30) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic() + .withTopics(topicName) + .withProtocol("tcp") + // TODO https://github.com/apache/camel-kafka-connector/issues/924 + .withHost("//localhost") + .withPort(PORT) + // disconnect so that it won't keep mock server socket forever + .withDisconnect(true) + // one-way as mock server doesn't send replies + .withSync(false); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(30) + public void testBasicSendReceiveUsingUrl() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory.basic() + .withTopics(topicName) + .withUrl("tcp", "localhost", PORT) + // disconnect so that it won't keep mock server socket forever + .append("disconnect", "true") + // one-way as mock server doesn't send replies + .append("sync", "false") + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java index 6c76789..5384e22 100644 --- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java @@ -25,7 +25,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -80,13 +79,13 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Test @Timeout(30) - @Disabled("Camel-Netty-* connectors are not working #924") public void testLaunchConnector() throws ExecutionException, InterruptedException { CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory .basic() .withKafkaTopic(topicName) .withProtocol("tcp") - .withHost("localhost") + // TODO https://github.com/apache/camel-kafka-connector/issues/924 + .withHost("//localhost") .withPort(PORT) // one-way as test client doesn't receive response .withSync(false);