This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit be88da601a3f0e929bfb4415707faabb088707b3 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 12:34:44 2021 +0100 Convert the HTTP tests to the new reusable sink test base class --- .../http/sink/CamelSinkHTTPITCase.java | 106 ++++++++++----------- 1 file changed, 48 insertions(+), 58 deletions(-) diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java index 17aa85f..ea5d2db 100644 --- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java +++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java @@ -20,17 +20,15 @@ package org.apache.camel.kafkaconnector.http.sink; import java.io.IOException; import java.net.InetAddress; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.http.impl.bootstrap.HttpServer; import org.apache.http.impl.bootstrap.ServerBootstrap; import org.junit.jupiter.api.AfterEach; @@ -45,13 +43,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkHTTPITCase extends AbstractKafkaTest { +public class CamelSinkHTTPITCase extends CamelSinkTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkHTTPITCase.class); private static final int HTTP_PORT = NetworkUtils.getFreePort("localhost"); private HttpServer localServer; private HTTPTestValidationHandler validationHandler; + private List<String> replies; + private String topicName; private final int expect = 10; @@ -62,6 +62,8 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest { @BeforeEach public void setUp() throws IOException { + topicName = getTopicForTest(this); + validationHandler = new HTTPTestValidationHandler(10); byte[] ipAddr = new byte[]{127, 0, 0, 1}; InetAddress localhost = InetAddress.getByAddress(ipAddr); @@ -83,76 +85,64 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest { } } + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } - private void putRecords() { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test"); - } catch (ExecutionException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - break; - } + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + replies = validationHandler + .getReplies() + .get(30, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Unable to ret replies: {}", e.getMessage(), e); + } finally { + latch.countDown(); } } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException, TimeoutException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(this::putRecords); - - LOG.debug("Created the consumer ... About to receive messages"); + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + if (replies == null) { + fail("Some messages should have been exchanged, but none seems to have gone through"); + } - List<String> replies = validationHandler.getReplies().get(30, TimeUnit.SECONDS); - if (replies == null) { - fail("Some messages should have been exchanged, but none seems to have gone through"); - } + for (String reply : replies) { + LOG.debug("Received: {} ", reply); + } - for (String reply : replies) { - LOG.debug("Received: {} ", reply); + assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent"); + } else { + fail("Failed to receive the messages within the specified time"); } - - assertEquals(replies.size(), expect, "Did not receive the same amount of messages that were sent"); - } + @Test @Timeout(90) - public void testBasicSendReceive() { - try { - String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc"; + public void testBasicSendReceive() throws Exception { + String url = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc"; - ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withHttpUri(url); + ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic() + .withTopics(topicName) + .withHttpUri(url); - runTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("HTTP test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + runTest(connectorPropertyFactory, topicName, expect); } @Test @Timeout(90) - public void testBasicSendReceiveUsingUrl() { - try { - String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc"; + public void testBasicSendReceiveUsingUrl() throws Exception { + String hostName = localServer.getInetAddress().getHostName() + ":" + HTTP_PORT + "/ckc"; - ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl(hostName) - .buildUrl(); + ConnectorPropertyFactory connectorPropertyFactory = CamelHTTPPropertyFactory.basic() + .withTopics(topicName) + .withUrl(hostName) + .buildUrl(); - - runTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("HTTP test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + runTest(connectorPropertyFactory, topicName, expect); } }