This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b6a1bb9320d57b6d3408f2d4e17c426c71bd03b3 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 15:59:57 2021 +0100 Convert the Syslog tests to the new reusable sink test base class --- .../syslog/sink/CamelSinkSyslogITCase.java | 71 ++++++++++++---------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java index 9273964..1b9f942 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java @@ -16,19 +16,19 @@ */ package org.apache.camel.kafkaconnector.syslog.sink; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.kafkaconnector.syslog.services.SyslogService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -39,15 +39,14 @@ import static org.junit.jupiter.api.Assertions.fail; * messages */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkSyslogITCase extends AbstractKafkaTest { +public class CamelSinkSyslogITCase extends CamelSinkTestSupport { private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP); + private static final String TEST_TXT = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!"; @RegisterExtension public static SyslogService syslogService = new SyslogService("udp", "//localhost", FREE_PORT); - private static final Logger LOG = LoggerFactory.getLogger(CamelSinkSyslogITCase.class); - - private int received; + private String topicName; private final int expect = 1; @Override @@ -57,36 +56,44 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - received = 0; + topicName = getTopicForTest(this); } - private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + @Override + protected String testMessageContent(int current) { + return TEST_TXT; + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } - LOG.debug("Creating the producer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!"); - LOG.debug("Created the producer ..."); + @Override + protected void consumeMessages(CountDownLatch latch) { + latch.countDown(); + } - assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class)); + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals(TEST_TXT, syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class)); + } else { + fail("Timed out wait for data to be added to the Kafka cluster"); + } } + @Test @Timeout(90) - public void testBasicReceive() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory - .basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withHost("localhost") - .withPort(FREE_PORT) - .withProtocol("udp"); - - runBasicProduceTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("Syslog test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + public void testBasicReceive() throws Exception { + ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory + .basic() + .withTopics(topicName) + .withHost("localhost") + .withPort(FREE_PORT) + .withProtocol("udp"); + + runTest(connectorPropertyFactory, topicName, expect); } }