This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 892cc45d07c0966239aeea08ceb0d66122ed5ba0
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 14:05:06 2021 +0100

    Convert the RabbitMQ tests to the new reusable sink test base class
---
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 120 ++++++++++-----------
 1 file changed, 60 insertions(+), 60 deletions(-)

diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
index d2c3ad6..01ad213 100644
--- 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -17,17 +17,18 @@
 package org.apache.camel.kafkaconnector.rabbitmq.sink;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.rabbitmq.client.DeliverCallback;
 import com.rabbitmq.client.Delivery;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService;
 import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -40,13 +41,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class RabbitMQSinkITCase extends AbstractKafkaTest {
+public class RabbitMQSinkITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static RabbitMQService rabbitmqService = 
RabbitMQServiceFactory.createService();
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkITCase.class);
     private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
 
+    private String topicName;
     private RabbitMQClient rabbitMQClient;
     private int received;
     private final int expect = 10;
@@ -57,9 +59,48 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest {
     }
 
     @BeforeEach
-    public void setUp() {
+    public void setUp() throws Exception {
+        topicName = getTopicForTest(this);
         received = 0;
+
         rabbitMQClient =  new RabbitMQClient(rabbitmqService.getAmqpUrl());
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+        rabbitMQClient.start();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        rabbitMQClient.stop();
+    }
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
+            if (!this.checkRecord(delivery)) {
+                latch.countDown();
+            }
+        };
+
+        try {
+            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
+        } catch (Exception e) {
+            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(15, TimeUnit.SECONDS)) {
+            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
     }
 
     private boolean checkRecord(Delivery rabbitMQDelivery) {
@@ -75,65 +116,24 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest {
         return true;
     }
 
-    private void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws Exception {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        LOG.debug("Creating the consumer ...");
-        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
-        try {
-            rabbitMQClient.start();
-            consumeRabbitMQMessages(latch);
-
-            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test 
message " + i);
-            }
-
-            LOG.debug("Created the consumer ... About to receive messages");
-
-            latch.await();
-            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
-        } finally {
-            rabbitMQClient.stop();
-        }
-    }
-
     @Test
     @Timeout(90)
     public void testSource() throws Exception {
         ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withUrl("")
-                .append("username", 
rabbitmqService.connectionProperties().username())
-                .append("password", 
rabbitmqService.connectionProperties().password())
-                .append("autoDelete", "false")
-                .append("queue", DEFAULT_RABBITMQ_QUEUE)
-                .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
-                .append("skipExchangeDeclare", "true")
-                .append("skipQueueBind", "true")
-                .append("hostname", 
rabbitmqService.connectionProperties().hostname())
-                .append("portNumber", 
rabbitmqService.connectionProperties().port())
-                .buildUrl();
-
-        runBasicStringTest(factory);
-    }
-
-    private void consumeRabbitMQMessages(CountDownLatch latch) {
-        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
-            if (!this.checkRecord(delivery)) {
-                latch.countDown();
-            }
-        };
-        try {
-            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
-        } catch (Exception e) {
-            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+                .withTopics(topicName)
+                    .withUrl("")
+                    .append("username", 
rabbitmqService.connectionProperties().username())
+                    .append("password", 
rabbitmqService.connectionProperties().password())
+                    .append("autoDelete", "false")
+                    .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                    .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
+                    .append("skipExchangeDeclare", "true")
+                    .append("skipQueueBind", "true")
+                    .append("hostname", 
rabbitmqService.connectionProperties().hostname())
+                    .append("portNumber", 
rabbitmqService.connectionProperties().port())
+                    .buildUrl();
+
+        runTest(factory, topicName, expect);
     }
 }

Reply via email to