This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 892cc45d07c0966239aeea08ceb0d66122ed5ba0 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 14:05:06 2021 +0100 Convert the RabbitMQ tests to the new reusable sink test base class --- .../rabbitmq/sink/RabbitMQSinkITCase.java | 120 ++++++++++----------- 1 file changed, 60 insertions(+), 60 deletions(-) diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java index d2c3ad6..01ad213 100644 --- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java @@ -17,17 +17,18 @@ package org.apache.camel.kafkaconnector.rabbitmq.sink; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -40,13 +41,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class RabbitMQSinkITCase extends AbstractKafkaTest { +public class RabbitMQSinkITCase extends CamelSinkTestSupport { @RegisterExtension public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSinkITCase.class); private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import"; + private String topicName; private RabbitMQClient rabbitMQClient; private int received; private final int expect = 10; @@ -57,9 +59,48 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest { } @BeforeEach - public void setUp() { + public void setUp() throws Exception { + topicName = getTopicForTest(this); received = 0; + rabbitMQClient = new RabbitMQClient(rabbitmqService.getAmqpUrl()); + rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); + rabbitMQClient.start(); + } + + @AfterEach + public void tearDown() { + rabbitMQClient.stop(); + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + DeliverCallback deliveryCallback = (consumerTag, delivery) -> { + if (!this.checkRecord(delivery)) { + latch.countDown(); + } + }; + + try { + rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback); + } catch (Exception e) { + LOG.error("RabbitMQ test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(15, TimeUnit.SECONDS)) { + assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } } private boolean checkRecord(Delivery rabbitMQDelivery) { @@ -75,65 +116,24 @@ public class RabbitMQSinkITCase extends AbstractKafkaTest { return true; } - private void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - CountDownLatch latch = new CountDownLatch(1); - - LOG.debug("Creating the consumer ..."); - rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); - try { - rabbitMQClient.start(); - consumeRabbitMQMessages(latch); - - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); - } - - LOG.debug("Created the consumer ... About to receive messages"); - - latch.await(); - assertEquals(received, expect, "Didn't process the expected amount of messages: " + received + " != " + expect); - } finally { - rabbitMQClient.stop(); - } - } - @Test @Timeout(90) public void testSource() throws Exception { ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory .basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) - .withUrl("") - .append("username", rabbitmqService.connectionProperties().username()) - .append("password", rabbitmqService.connectionProperties().password()) - .append("autoDelete", "false") - .append("queue", DEFAULT_RABBITMQ_QUEUE) - .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE) - .append("skipExchangeDeclare", "true") - .append("skipQueueBind", "true") - .append("hostname", rabbitmqService.connectionProperties().hostname()) - .append("portNumber", rabbitmqService.connectionProperties().port()) - .buildUrl(); - - runBasicStringTest(factory); - } - - private void consumeRabbitMQMessages(CountDownLatch latch) { - DeliverCallback deliveryCallback = (consumerTag, delivery) -> { - if (!this.checkRecord(delivery)) { - latch.countDown(); - } - }; - try { - rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback); - } catch (Exception e) { - LOG.error("RabbitMQ test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + .withTopics(topicName) + .withUrl("") + .append("username", rabbitmqService.connectionProperties().username()) + .append("password", rabbitmqService.connectionProperties().password()) + .append("autoDelete", "false") + .append("queue", DEFAULT_RABBITMQ_QUEUE) + .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE) + .append("skipExchangeDeclare", "true") + .append("skipQueueBind", "true") + .append("hostname", rabbitmqService.connectionProperties().hostname()) + .append("portNumber", rabbitmqService.connectionProperties().port()) + .buildUrl(); + + runTest(factory, topicName, expect); } }