This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b6a1bb9320d57b6d3408f2d4e17c426c71bd03b3
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 15:59:57 2021 +0100

    Convert the Syslog tests to the new reusable sink test base class
---
 .../syslog/sink/CamelSinkSyslogITCase.java         | 71 ++++++++++++----------
 1 file changed, 39 insertions(+), 32 deletions(-)

diff --git 
a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
 
b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 9273964..1b9f942 100644
--- 
a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ 
b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -16,19 +16,19 @@
  */
 package org.apache.camel.kafkaconnector.syslog.sink;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -39,15 +39,14 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkSyslogITCase extends AbstractKafkaTest {
+public class CamelSinkSyslogITCase extends CamelSinkTestSupport {
     private static final int FREE_PORT = NetworkUtils.getFreePort("localhost", 
NetworkUtils.Protocol.UDP);
+    private static final String TEST_TXT = "<13>1 
2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality tzKnown=\"1\" 
isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!";
 
     @RegisterExtension
     public static SyslogService syslogService = new SyslogService("udp", 
"//localhost", FREE_PORT);
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkSyslogITCase.class);
-
-    private int received;
+    private String topicName;
     private final int expect = 1;
 
     @Override
@@ -57,36 +56,44 @@ public class CamelSinkSyslogITCase extends 
AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
+        topicName = getTopicForTest(this);
     }
 
-    private void runBasicProduceTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+    @Override
+    protected String testMessageContent(int current) {
+        return TEST_TXT;
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
 
-        LOG.debug("Creating the producer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), 
"<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - - [timeQuality 
tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!");
-        LOG.debug("Created the producer ...");
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        latch.countDown();
+    }
 
-        assertEquals("<13>1 2020-05-14T14:47:01.198+02:00 nathannever myapp - 
- [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"11266\"] FOO BAR!", 
syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(30, TimeUnit.SECONDS)) {
+            assertEquals(TEST_TXT, 
syslogService.getFirstExchangeToBeReceived().getIn().getBody(String.class));
+        } else {
+            fail("Timed out wait for data to be added to the Kafka cluster");
+        }
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = 
CamelSyslogPropertyFactory
-                    .basic()
-                    .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withHost("localhost")
-                    .withPort(FREE_PORT)
-                    .withProtocol("udp");
-
-            runBasicProduceTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("Syslog test failed: {} {}", e.getMessage(), e);
-            fail(e.getMessage(), e);
-        }
+    public void testBasicReceive() throws Exception {
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelSyslogPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withHost("localhost")
+                .withPort(FREE_PORT)
+                .withProtocol("udp");
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }

Reply via email to