This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit f6a3e04983ecd03cfd6a1bba3423425f00f02ec5 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Sat Mar 6 02:44:30 2021 +0100 Fixed itest for netty-http. --- .../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 5 +- .../source/CamelNettyhttpPropertyFactory.java | 63 ------------ .../source/CamelSourceNettyHTTPITCase.java | 2 +- .../source/CamelSourceNettyhttpITCase.java | 109 --------------------- tests/pom.xml | 1 - 5 files changed, 4 insertions(+), 176 deletions(-) diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java index cdf8b2c..96bd27a 100644 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java @@ -20,6 +20,7 @@ package org.apache.camel.kafkaconnector.nettyhttp.sink; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; @@ -94,7 +95,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { .withHost(mockServer.getHostName()) .withPort(mockServer.getPort()) .withPath("test"); - + mockServer.enqueue(new MockResponse().setResponseCode(200)); runTest(connectorPropertyFactory, topicName, expect); } @@ -105,7 +106,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { .withTopics(topicName) .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test") .buildUrl(); - + mockServer.enqueue(new MockResponse().setResponseCode(200)); runTest(connectorPropertyFactory, topicName, expect); } } diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java deleted file mode 100644 index d97340f..0000000 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelNettyhttpPropertyFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.nettyhttp.source; - -import org.apache.camel.LoggingLevel; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; - -final class CamelNettyhttpPropertyFactory extends SourceConnectorPropertyFactory<CamelNettyhttpPropertyFactory> { - - private CamelNettyhttpPropertyFactory() { - } - - public CamelNettyhttpPropertyFactory withProtocol(String value) { - return setProperty("camel.source.path.protocol", value); - } - - public CamelNettyhttpPropertyFactory withHost(String value) { - return setProperty("camel.source.path.host", value); - } - - public CamelNettyhttpPropertyFactory withPort(int value) { - return setProperty("camel.source.path.port", value); - } - - public CamelNettyhttpPropertyFactory withPath(String value) { - return setProperty("camel.source.path.path", value); - } - - public EndpointUrlBuilder<CamelNettyhttpPropertyFactory> withUrl(String protocol, String host, int port, String path) { - String url = String.format("netty-http:%s://%s:%s/%s", protocol, host, port, path); - return new EndpointUrlBuilder<>(this::withSourceUrl, url); - } - - public static CamelNettyhttpPropertyFactory basic() { - return new CamelNettyhttpPropertyFactory() - .withName("CamelNettyhttpSourceConnector") - .withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.nettyhttp.CamelNettyhttpSourceConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withTransformsConfig("tostring") - .withEntry("type", "org.apache.camel.kafkaconnector.transforms.CamelTypeConverterTransform$Value") - .withEntry("target.type", "java.lang.String") - .end() - .withSourceContentLogginglevel(LoggingLevel.DEBUG); - } -} diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java index 41cb6e1..48bcb59 100644 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java @@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class); - private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost"); + private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000); private static final String TEST_MESSAGE = "testMessage"; private String topicName; diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java deleted file mode 100644 index e1c28de..0000000 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyhttpITCase.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.nettyhttp.source; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; - -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; -import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; -import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@Disabled("Convert NettyChannelBufferStreamCache from NettyHttpSource not converted to string #969") -public class CamelSourceNettyhttpITCase extends CamelSourceTestSupport { - private final String host = NetworkUtils.getHostname(); - private final int port = NetworkUtils.getFreePort(); - - private final int expect = 1; - private String topicName; - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-netty-http-kafka-connector"}; - } - - @BeforeEach - public void setUp() { - topicName = getTopicForTest(this); - } - - @Override - protected void produceTestData() { - TestUtils.waitFor(() -> NetworkUtils.portIsOpen(host, port)); - sendMessage(); - } - - void sendMessage() { - OkHttpClient client = new OkHttpClient(); - RequestBody body = RequestBody.create(MediaType.get("text/plain; charset=utf-8"), "Hello CKC!"); - Request request = new Request.Builder() - .url("http://" + host + ":" + port + "/test") - .post(body) - .build(); - try (Response response = client.newCall(request).execute()) { - assertEquals(200, response.code(), "Source endpoint didn't return 200"); - } catch (IOException e) { - fail(e.getMessage(), e); - } - } - - @Override - protected void verifyMessages(TestMessageConsumer<?> consumer) { - int received = consumer.consumedMessages().size(); - String receivedObject = (String) consumer.consumedMessages().get(0).value(); - assertEquals(expect, received, "Did not receive as many messages as expected"); - assertEquals("Hello CKC!", receivedObject, "Received message content differed"); - } - - @Test - @Timeout(30) - public void testLaunchConnector() throws ExecutionException, InterruptedException { - CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() - .withKafkaTopic(topicName) - .withProtocol("http") - .withHost(host) - .withPort(port) - .withPath("test"); - - runTest(connectorPropertyFactory, topicName, expect); - } - - @Test - @Timeout(30) - public void testLaunchConnectorUsingUrl() throws ExecutionException, InterruptedException { - CamelNettyhttpPropertyFactory connectorPropertyFactory = CamelNettyhttpPropertyFactory.basic() - .withKafkaTopic(topicName) - .withUrl("http", host, port, "test") - .buildUrl(); - - runTest(connectorPropertyFactory, topicName, expect); - } -} diff --git a/tests/pom.xml b/tests/pom.xml index 16eeb17..0c3ff80 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -54,7 +54,6 @@ <module>itests-salesforce</module> <module>itests-hdfs</module> <module>itests-mongodb</module> - <module>itests-netty-http</module> <module>itests-jdbc</module> <module>itests-azure-storage-blob</module> <module>itests-azure-storage-queue</module>