This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit de00cf1f2460089ac3d2a503a5a73c419263d9f5
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Tue Feb 9 09:48:12 2021 +0100

    Converted the Syslog source test case to use the reusable source base class
---
 .../syslog/source/CamelSourceSyslogITCase.java     | 124 ++++++++++-----------
 1 file changed, 59 insertions(+), 65 deletions(-)

diff --git 
a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
 
b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index 34d8228..7f14a2e 100644
--- 
a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ 
b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -17,17 +17,22 @@
 
 package org.apache.camel.kafkaconnector.syslog.source;
 
+import java.util.concurrent.ExecutionException;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.syslog.SyslogDataFormat;
 import org.apache.camel.component.syslog.netty.Rfc5425Encoder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.TestInstance;
@@ -36,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
 
 
 /**
@@ -44,91 +48,81 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceSyslogITCase extends AbstractKafkaTest {
-    private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", 
NetworkUtils.Protocol.UDP);
-
+public class CamelSourceSyslogITCase extends CamelSourceTestSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceSyslogITCase.class);
+    private static final String HOST = "localhost";
+    private static final String PROTOCOL = "udp";
+    private static final int FREE_PORT = NetworkUtils.getFreePort(HOST, 
NetworkUtils.Protocol.UDP);
 
-    private int received;
     private final int expect = 1;
+    private ConnectorPropertyFactory connectorPropertyFactory;
+    private String topicName;
+
+    private CamelContext camelContext;
 
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-syslog-kafka-connector"};
     }
 
+    @BeforeAll
+    public void setupCamelContext() throws Exception {
+        LOG.debug("Creating the Camel context");
+        camelContext = new DefaultCamelContext();
+        camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
+
+        LOG.debug("Adding routes");
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:test")
+                        .marshal(new SyslogDataFormat())
+                        
.toF("netty:%s://%s:%d?sync=false&encoders=#encoder&useByteBuf=true", PROTOCOL, 
HOST, FREE_PORT);
+            }
+        });
+    }
+
     @BeforeEach
     public void setUp() {
-        received = 0;
-    }
+        topicName = getTopicForTest(this);
 
-    private void produceLogMessages(String protocol, String host, String port, 
String message) {
-        CamelContext camelContext = new DefaultCamelContext();
-
-        try {
-            camelContext.getRegistry().bind("encoder", new Rfc5425Encoder());
-            camelContext.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() {
-                    from("direct:test").marshal(new 
SyslogDataFormat()).to("netty:" + protocol + ":" + host + ":" + port + 
"?sync=false&encoders=#encoder&useByteBuf=true");
-                }
-            });
-
-            camelContext.start();
-            camelContext.createProducerTemplate().sendBody("direct:test", 
message);
-        } catch (Exception e) {
-            LOG.error("Failed to send log messages {} to : {}", message, 
"netty:" + protocol + ":" + host + ":" + port);
-            fail(e.getMessage());
-        } finally {
-            camelContext.stop();
-        }
+        camelContext.start();
+        TestUtils.waitFor(camelContext::isStarted);
     }
 
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
+    @AfterEach
+    public void tearDown() {
+        camelContext.stop();
     }
 
-    private void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+    @Override
+    protected void produceTestData() {
+        String message = "<13>1 2020-05-14T14:47:01.198+02:00 nathannever 
myapp - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO 
BAR!";
 
-        // Add a small delay to let Camel finish netty initialization 
otherwise the port may be unreachable
-        Thread.sleep(1000);
-        
produceLogMessages(connectorPropertyFactory.getProperties().get("camel.source.path.protocol").toString(),
-                
connectorPropertyFactory.getProperties().get("camel.source.path.host").toString(),
-                
connectorPropertyFactory.getProperties().get("camel.source.path.port").toString(),
-                "<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - 
[timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
+        camelContext.createProducerTemplate().sendBody("direct:test", message);
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
         assertEquals(received, expect, "Didn't process the expected amount of 
messages");
     }
 
+
     @RepeatedTest(3)
     @Timeout(90)
-    public void testBasicSend() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelSyslogPropertyFactory
-                    .basic()
-                    
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHost("localhost")
-                    .withPort(FREE_PORT)
-                    .withProtocol("udp");
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+    public void testBasicSend() throws ExecutionException, 
InterruptedException {
+        connectorPropertyFactory = CamelSyslogPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withHost(HOST)
+                .withPort(FREE_PORT)
+                .withProtocol(PROTOCOL);
+
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+        StringMessageConsumer stringMessageConsumer = new 
StringMessageConsumer(kafkaClient, topicName, expect);
+
+        runTestBlocking(connectorPropertyFactory, stringMessageConsumer);
     }
 }

Reply via email to