This is an automated email from the ASF dual-hosted git repository. tsato pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new c2d6a93 Add netty-http itests #1036 c2d6a93 is described below commit c2d6a9323dc11bbd40215e09c7775a69ca8f472c Author: Tadayoshi Sato <sato.tadayo...@gmail.com> AuthorDate: Thu Mar 4 19:50:25 2021 +0900 Add netty-http itests #1036 netty-http source itests are disabled due to #969 --- .../common/SinkConnectorPropertyFactory.java | 12 +-- .../common/SourceConnectorPropertyFactory.java | 29 ++++-- tests/itests-netty-http/pom.xml | 56 +++++++++++ .../sink/CamelNettyhttpPropertyFactory.java | 65 ++++++++++++ .../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 111 +++++++++++++++++++++ .../source/CamelNettyhttpPropertyFactory.java | 63 ++++++++++++ .../source/CamelSourceNettyhttpITCase.java} | 62 ++++++------ .../netty/source/CamelSourceNettyITCase.java | 12 +-- tests/itests-parent/pom.xml | 19 +++- tests/pom.xml | 1 + 10 files changed, 374 insertions(+), 56 deletions(-) diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java index 0684164..356ee0d 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SinkConnectorPropertyFactory.java @@ -17,17 +17,15 @@ package org.apache.camel.kafkaconnector.common; -public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { +import static org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF; - public T withTopics(String topics) { - getProperties().put("topics", topics); +public abstract class SinkConnectorPropertyFactory<T extends SinkConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { - return (T) this; + public T withTopics(String topics) { + return setProperty("topics", topics); } public T withSinkUrl(String sinkUrl) { - getProperties().put("camel.sink.url", sinkUrl); - - return (T) this; + return setProperty(CAMEL_SINK_URL_CONF, sinkUrl); } } diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java index 684459c..aa59552 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/SourceConnectorPropertyFactory.java @@ -17,25 +17,32 @@ package org.apache.camel.kafkaconnector.common; -public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { +import org.apache.camel.LoggingLevel; - public T withKafkaTopic(String topic) { - getProperties().put("topics", topic); +import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_NAME; +import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF; +import static org.apache.camel.kafkaconnector.CamelConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF; +import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF; +import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF; +import static org.apache.camel.kafkaconnector.CamelSourceConnectorConfig.TOPIC_CONF; + +public abstract class SourceConnectorPropertyFactory<T extends SourceConnectorPropertyFactory<T>> extends BasicConnectorPropertyFactory<T> { - return (T) this; + public T withKafkaTopic(String topic) { + return setProperty(TOPIC_CONF, topic); } public T withSourceUrl(String sourceUrl) { - getProperties().put("camel.source.url", sourceUrl); + return setProperty(CAMEL_SOURCE_URL_CONF, sourceUrl); + } - return (T) this; + public T withSourceContentLogginglevel(LoggingLevel level) { + return setProperty(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, level.toString()); } public T withAggregate(String aggregate, int size, int timeout) { - withBeans("aggregate", classRef(aggregate)); - getProperties().put("camel.aggregation.size", size); - getProperties().put("camel.aggregation.timeout", timeout); - - return (T) this; + return withBeans(CAMEL_CONNECTOR_AGGREGATE_NAME, classRef(aggregate)) + .setProperty(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, size) + .setProperty(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, timeout); } } diff --git a/tests/itests-netty-http/pom.xml b/tests/itests-netty-http/pom.xml new file mode 100644 index 0000000..16238cf --- /dev/null +++ b/tests/itests-netty-http/pom.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-netty-http</artifactId> + <name>Camel-Kafka-Connector :: Tests :: Netty HTTP</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-netty-http</artifactId> + </dependency> + + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java new file mode 100644 index 0000000..9754e7d --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelNettyhttpPropertyFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.sink; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +final class CamelNettyhttpPropertyFactory extends SinkConnectorPropertyFactory<CamelNettyhttpPropertyFactory> { + + private CamelNettyhttpPropertyFactory() { + } + + public CamelNettyhttpPropertyFactory withProtocol(String value) { + return setProperty("camel.sink.path.protocol", value); + } + + public CamelNettyhttpPropertyFactory withHost(String value) { + return setProperty("camel.sink.path.host", value); + } + + public CamelNettyhttpPropertyFactory withPort(int value) { + return setProperty("camel.sink.path.port", value); + } + + public CamelNettyhttpPropertyFactory withPath(String value) { + return setProperty("camel.sink.path.path", value); + } + + public CamelNettyhttpPropertyFactory withDisconnect(boolean value) { + return setProperty("camel.sink.endpoint.disconnect", value); + } + + public CamelNettyhttpPropertyFactory withSync(boolean value) { + return setProperty("camel.sink.endpoint.sync", value); + } + + public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) { + String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path); + return new EndpointUrlBuilder<>(this::withSinkUrl, url); + } + + public static CamelNettyhttpPropertyFactory basic() { + return new CamelNettyhttpPropertyFactory() + .withName("CamelNettyhttpSinkConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + } +} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java new file mode 100644 index 0000000..cdf8b2c --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.sink; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class); + + private MockWebServer mockServer; + + private String topicName; + + private final int expect = 1; + private volatile RecordedRequest received; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-netty-http-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); + mockServer = new MockWebServer(); + received = null; + } + + @AfterEach + public void tearDown() throws Exception { + if (mockServer != null) { + mockServer.shutdown(); + } + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + received = mockServer.takeRequest(); + } catch (InterruptedException e) { + LOG.error("Unable to receive messages: {}", e.getMessage(), e); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + String expected = "Sink test message 0"; + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals("/test", received.getPath(), "Received path differed"); + assertEquals(expected, received.getBody().readUtf8(), "Received message content differed"); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + @Timeout(30) + public void testBasicSendReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() + .withTopics(topicName) + .withProtocol("http") + .withHost(mockServer.getHostName()) + .withPort(mockServer.getPort()) + .withPath("test"); + + runTest(connectorPropertyFactory, topicName, expect); + } + + @Test + @Timeout(30) + public void testBasicSendReceiveUsingUrl() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() + .withTopics(topicName) + .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test") + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expect); + } +} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java new file mode 100644 index 0000000..d97340f --- /dev/null +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.nettyhttp.source; + +import org.apache.camel.LoggingLevel; +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; + +final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> { + + private CamelNettyhttpPropertyFactory() { + } + + public CamelNettyhttpPropertyFactory withProtocol(String value) { + return setProperty("camel.source.path.protocol", value); + } + + public CamelNettyhttpPropertyFactory withHost(String value) { + return setProperty("camel.source.path.host", value); + } + + public CamelNettyhttpPropertyFactory withPort(int value) { + return setProperty("camel.source.path.port", value); + } + + public CamelNettyhttpPropertyFactory withPath(String value) { + return setProperty("camel.source.path.path", value); + } + + public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) { + String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path); + return new EndpointUrlBuilder<>(this::withSourceUrl, url); + } + + public static CamelNettyhttpPropertyFactory basic() { + return new CamelNettyhttpPropertyFactory() + .withName("CamelNettyhttpSourceConnector") + .withTasksMax(1) + .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withTransformsConfig("tostring") + .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value") + .withEntry("target.type", "java.lang.String") + .end() + .withSourceContentLogginglevel(LoggingLevel.DEBUG); + } +} diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java similarity index 58% copy from tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java copy to tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java index b2ef5ee..e1c28de 100644 --- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java @@ -15,23 +15,31 @@ * limitations under the License. */ -package org.apache.camel.kafkaconnector.netty.source; +package org.apache.camel.kafkaconnector.nettyhttp.source; -import java.io.PrintWriter; -import java.net.Socket; +import java.io.IOException; import java.util.concurrent.ExecutionException; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -public class CamelSourceNettyITCase extends CamelSourceTestSupport { +@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969") +public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport { + private final String host = NetworkUtils.getHostname(); private final int port = NetworkUtils.getFreePort(); private final int expect = 1; @@ -39,7 +47,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Override protected String[] getConnectorsInTest() { - return new String[] {"camel-netty-kafka-connector"}; + return new String[] {"camel-netty-http-kafka-connector"}; } @BeforeEach @@ -49,20 +57,20 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Override protected void produceTestData() { - try { - // TODO necessary to wait for ckc netty endpoint to be up and ready - Thread.sleep(3000); - } catch (Exception ignored) { - } + TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port)); sendMessage(); } void sendMessage() { - try (Socket s = new Socket(NetworkUtils.getHostname(), port); - PrintWriter out = new PrintWriter(s.getOutputStream())) { - out.print("Hello CKC!"); - out.flush(); - } catch (Exception e) { + OkHttpClient client = new OkHttpClient(); + RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!"); + Request request = new Request.Builder() + .url("http://" + host + ":" + port + "/test") + .post(body) + .build(); + try (Response response = client.newCall(request).execute()) { + assertEquals(200, response.code(), "Source endpoint didn't return 200"); + } catch (IOException e) { fail(e.getMessage(), e); } } @@ -70,7 +78,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Override protected void verifyMessages(TestMessageConsumer<?> consumer) { int received = consumer.consumedMessages().size(); - Object receivedObject = consumer.consumedMessages().get(0).value(); + String receivedObject = (String) consumer.consumedMessages().get(0).value(); assertEquals(expect, received, "Did not receive as many messages as expected"); assertEquals("Hello CKC!", receivedObject, "Received message content differed"); } @@ -78,30 +86,24 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Test @Timeout(30) public void testLaunchConnector() throws ExecutionException, InterruptedException { - CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory - .basic() + CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() .withKafkaTopic(topicName) - .withProtocol("tcp") - // TODO https://github.com/apache/camel-kafka-connector/issues/924 - .withHost("//" + NetworkUtils.getHostname()) + .withProtocol("http") + .withHost(host) .withPort(port) - // one-way as test client doesn't receive response - .withSync(false); + .withPath("test"); - runTestBlocking(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, topicName, expect); } @Test @Timeout(30) public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException { - CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory - .basic() + CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() .withKafkaTopic(topicName) - .withUrl("tcp", NetworkUtils.getHostname(), port) - // one-way as test client doesn't receive response - .append("sync", "false") + .withUrl("http", host, port, "test") .buildUrl(); - runTestBlocking(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, topicName, expect); } } diff --git a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java index b2ef5ee..481b15c 100644 --- a/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java +++ b/tests/itests-netty/src/test/java/org/apache/camel/kafkaconnector/netty/source/CamelSourceNettyITCase.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -32,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; public class CamelSourceNettyITCase extends CamelSourceTestSupport { + private final String host = NetworkUtils.getHostname(); private final int port = NetworkUtils.getFreePort(); private final int expect = 1; @@ -49,11 +51,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { @Override protected void produceTestData() { - try { - // TODO necessary to wait for ckc netty endpoint to be up and ready - Thread.sleep(3000); - } catch (Exception ignored) { - } + TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port)); sendMessage(); } @@ -83,7 +81,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { .withKafkaTopic(topicName) .withProtocol("tcp") // TODO https://github.com/apache/camel-kafka-connector/issues/924 - .withHost("//" + NetworkUtils.getHostname()) + .withHost("//" + host) .withPort(port) // one-way as test client doesn't receive response .withSync(false); @@ -97,7 +95,7 @@ public class CamelSourceNettyITCase extends CamelSourceTestSupport { CamelNettyPropertyFactory connectorPropertyFactory = CamelNettyPropertyFactory .basic() .withKafkaTopic(topicName) - .withUrl("tcp", NetworkUtils.getHostname(), port) + .withUrl("tcp", host, port) // one-way as test client doesn't receive response .append("sync", "false") .buildUrl(); diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml index 3c37eec..a109494 100644 --- a/tests/itests-parent/pom.xml +++ b/tests/itests-parent/pom.xml @@ -188,6 +188,23 @@ </dependency> </dependencies> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>${squareup-okhttp-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <version>${squareup-okhttp-version}</version> + <scope>test</scope> + </dependency> + </dependencies> + </dependencyManagement> + <build> <plugins> <plugin> @@ -220,4 +237,4 @@ </build> -</project> \ No newline at end of file +</project> diff --git a/tests/pom.xml b/tests/pom.xml index 097d2bd..fadfc35 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -64,6 +64,7 @@ <module>itests-sql</module> <module>itests-cxf</module> <module>itests-netty</module> + <module>itests-netty-http</module> </modules>