This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit de00cf1f2460089ac3d2a503a5a73c419263d9f5 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Feb 9 09:48:12 2021 +0100 Converted the Syslog source test case to use the reusable source base class --- .../syslog/source/CamelSourceSyslogITCase.java | 124 ++++++++++----------- 1 file changed, 59 insertions(+), 65 deletions(-) diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java index 34d8228..7f14a2e 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java @@ -17,17 +17,22 @@ package org.apache.camel.kafkaconnector.syslog.source; +import java.util.concurrent.ExecutionException; + import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.syslog.SyslogDataFormat; import org.apache.camel.component.syslog.netty.Rfc5425Encoder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.TestInstance; @@ -36,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; /** @@ -44,91 +48,81 @@ import static org.junit.jupiter.api.Assertions.fail; * messages */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceSyslogITCase extends AbstractKafkaTest { - private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", NetworkUtils.Protocol.UDP); - +public class CamelSourceSyslogITCase extends CamelSourceTestSupport { private static final Logger LOG = LoggerFactory.getLogger(CamelSourceSyslogITCase.class); + private static final String HOST = "localhost"; + private static final String PROTOCOL = "udp"; + private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, NetworkUtils.Protocol.UDP); - private int received; private final int expect = 1; + private ConnectorPropertyFactory connectorPropertyFactory; + private String topicName; + + private CamelContext camelContext; @Override protected String[] getConnectorsInTest() { return new String[] {"camel-syslog-kafka-connector"}; } + @BeforeAll + public void setupCamelContext() throws Exception { + LOG.debug("Creating the Camel context"); + camelContext = new DefaultCamelContext(); + camelContext.getRegistry().bind("encoder", new Rfc5425Encoder()); + + LOG.debug("Adding routes"); + camelContext.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("direct:test") + .marshal(new SyslogDataFormat()) + .toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, HOST, FREE_PORT); + } + }); + } + @BeforeEach public void setUp() { - received = 0; - } + topicName = getTopicForTest(this); - private void produceLogMessages(String protocol, String host, String port, String message) { - CamelContext camelContext = new DefaultCamelContext(); - - try { - camelContext.getRegistry().bind("encoder", new Rfc5425Encoder()); - camelContext.addRoutes(new RouteBuilder() { - @Override - public void configure() { - from("direct:test").marshal(new SyslogDataFormat()).to("netty:" + protocol + ":" + host + ":" + port + "?sync=false&encoders=#encoder&useByteBuf=true"); - } - }); - - camelContext.start(); - camelContext.createProducerTemplate().sendBody("direct:test", message); - } catch (Exception e) { - LOG.error("Failed to send log messages {} to : {}", message, "netty:" + protocol + ":" + host + ":" + port); - fail(e.getMessage()); - } finally { - camelContext.stop(); - } + camelContext.start(); + TestUtils.waitFor(camelContext::isStarted); } - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; + @AfterEach + public void tearDown() { + camelContext.stop(); } - private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + @Override + protected void produceTestData() { + String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!"; - // Add a small delay to let Camel finish netty initialization otherwise the port may be unreachable - Thread.sleep(1000); - produceLogMessages(connectorPropertyFactory.getProperties().get("camel.source.path.protocol").toString(), - connectorPropertyFactory.getProperties().get("camel.source.path.host").toString(), - connectorPropertyFactory.getProperties().get("camel.source.path.port").toString(), - "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!"); + camelContext.createProducerTemplate().sendBody("direct:test", message); + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + @RepeatedTest(3) @Timeout(90) - public void testBasicSend() { - try { - ConnectorPropertyFactory connectorPropertyFactory = CamelSyslogPropertyFactory - .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withHost("localhost") - .withPort(FREE_PORT) - .withProtocol("udp"); - - runBasicStringTest(connectorPropertyFactory); - } catch (Exception e) { - LOG.error("Syslog test failed: {} {}", e.getMessage(), e); - fail(e.getMessage(), e); - } + public void testBasicSend() throws ExecutionException, InterruptedException { + connectorPropertyFactory = CamelSyslogPropertyFactory + .basic() + .withKafkaTopic(topicName) + .withHost(HOST) + .withPort(FREE_PORT) + .withProtocol(PROTOCOL); + + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + StringMessageConsumer stringMessageConsumer = new StringMessageConsumer(kafkaClient, topicName, expect); + + runTestBlocking(connectorPropertyFactory, stringMessageConsumer); } }