This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-0.7.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b264559126650aa4c7fe748d1c418a8769f818a7 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Mar 12 17:03:25 2021 +0100 Fixed backported netty-http tests --- .../nettyhttp/sink/CamelSinkNettyhttpITCase.java | 59 ++++++++++++------- .../source/CamelSourceNettyHTTPITCase.java | 68 ++++++++++++++-------- 2 files changed, 84 insertions(+), 43 deletions(-) diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java index 96bd27a..db1e27c 100644 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/sink/CamelSinkNettyhttpITCase.java @@ -17,14 +17,18 @@ package org.apache.camel.kafkaconnector.nettyhttp.sink; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,7 +39,7 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { +public class CamelSinkNettyhttpITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkNettyhttpITCase.class); private MockWebServer mockServer; @@ -52,7 +56,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { @BeforeEach public void setUp() { - topicName = getTopicForTest(this); + topicName = TestUtils.getDefaultTestTopic(this.getClass()); mockServer = new MockWebServer(); received = null; } @@ -64,28 +68,43 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { } } - @Override - protected void consumeMessages(CountDownLatch latch) { + protected void verifyMessages() throws InterruptedException { + String expected = "test 0"; try { - received = mockServer.takeRequest(); + received = mockServer.takeRequest(30, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOG.error("Unable to receive messages: {}", e.getMessage(), e); - } finally { - latch.countDown(); + LOG.error("Unable to receive http requests: {}", e.getMessage(), e); + fail("Failed to receive the messages within the specified time"); } + assertEquals("/test", received.getPath(), "Received path differed"); + assertEquals(expected, received.getBody().readUtf8(), "Received message content differed"); } - @Override - protected void verifyMessages(CountDownLatch latch) throws InterruptedException { - String expected = "Sink test message 0"; - if (latch.await(30, TimeUnit.SECONDS)) { - assertEquals("/test", received.getPath(), "Received path differed"); - assertEquals(expected, received.getBody().readUtf8(), "Received message content differed"); - } else { - fail("Failed to receive the messages within the specified time"); + private void putRecords() { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < expect; i++) { + try { + kafkaClient.produce(topicName, "test " + i); + } catch (ExecutionException e) { + LOG.error("Unable to produce messages: {}", e.getMessage(), e); + } catch (InterruptedException e) { + break; + } } } + public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); + + getKafkaConnectService().initializeConnector(connectorPropertyFactory); + + ExecutorService service = Executors.newCachedThreadPool(); + service.submit(() -> putRecords()); + + verifyMessages(); + } + @Test @Timeout(30) public void testBasicSendReceive() throws Exception { @@ -96,7 +115,7 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { .withPort(mockServer.getPort()) .withPath("test"); mockServer.enqueue(new MockResponse().setResponseCode(200)); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory); } @Test @@ -107,6 +126,6 @@ public class CamelSinkNettyhttpITCase extends CamelSinkTestSupport { .withUrl("http", mockServer.getHostName(), mockServer.getPort(), "test") .buildUrl(); mockServer.enqueue(new MockResponse().setResponseCode(200)); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory); } } diff --git a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java index 0174eb1..fe5d884 100644 --- a/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java +++ b/tests/itests-netty-http/src/test/java/org/apache/camel/kafkaconnector/nettyhttp/source/CamelSourceNettyHTTPITCase.java @@ -18,16 +18,19 @@ package org.apache.camel.kafkaconnector.nettyhttp.source; import java.io.IOException; import java.net.InetAddress; +import java.util.concurrent.ExecutionException; +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; -import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -39,14 +42,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { +public class CamelSourceNettyHTTPITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceNettyHTTPITCase.class); - private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 30000, 40000); + private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost", 20000, 29000); private static final String TEST_MESSAGE = "testMessage"; private String topicName; private final int expect = 1; + private int received; @Override protected String[] getConnectorsInTest() { @@ -54,26 +58,28 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { } @BeforeEach - public void setUp() throws IOException { - topicName = getTopicForTest(this); + public void setUp() { + topicName = TestUtils.getDefaultTestTopic(this.getClass()); } - @Test - @Timeout(90) - public void testBasicSendReceive() throws Exception { + protected void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { + connectorPropertyFactory.log(); - ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic() - .withKafkaTopic(topicName) - .withReceiveBufferSize(10) - .withHost("0.0.0.0") - .withPort(HTTP_PORT) - .withProtocol("http") - .withCamelTypeConverterTransformTo("java.lang.String"); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + LOG.debug("Sending http request"); + produceTestData(); + LOG.debug("Http request sent"); + + LOG.debug("Creating the consumer ..."); + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + LOG.debug("Consuming messages ..."); + kafkaClient.consume(topicName, this::checkRecord); + LOG.debug("Messages consumed."); - runTestBlocking(connectorPropertyFactory, topicName, expect); + assertEquals(received, expect, "Didn't process the expected amount of messages"); } - @Override protected void produceTestData() { int retriesLeft = 10; boolean success = false; @@ -110,10 +116,26 @@ public class CamelSourceNettyHTTPITCase extends CamelSourceTestSupport { } } - @Override - protected void verifyMessages(TestMessageConsumer<?> consumer) { - int received = consumer.consumedMessages().size(); - assertEquals(expect, received, "Didn't process the expected amount of messages"); - assertEquals(TEST_MESSAGE, consumer.consumedMessages().get(0).value().toString()); + protected <T> boolean checkRecord(ConsumerRecord<String, T> record) { + LOG.debug("Received: {}", record.value()); + received++; + assertEquals(TEST_MESSAGE, record.value().toString()); + + return false; + } + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + + ConnectorPropertyFactory connectorPropertyFactory = CamelNettyHTTPPropertyFactory.basic() + .withKafkaTopic(topicName) + .withReceiveBufferSize(10) + .withHost("0.0.0.0") + .withPort(HTTP_PORT) + .withProtocol("http") + .withCamelTypeConverterTransformTo("java.lang.String"); + + runTest(connectorPropertyFactory); } }