This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit b55096ed893a0d74ec32821b510df2bbd71c68b0 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 20:03:43 2021 +0100 Converted the RabbitMQ source test case to use the reusable source base class --- .../rabbitmq/source/RabbitMQSourceITCase.java | 64 +++++++++------------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java index 15950e7..4ef2ae2 100644 --- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java @@ -18,14 +18,12 @@ package org.apache.camel.kafkaconnector.rabbitmq.source; import java.util.concurrent.ExecutionException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class RabbitMQSourceITCase extends AbstractKafkaTest { +public class RabbitMQSourceITCase extends CamelSourceTestSupport { @RegisterExtension public static RabbitMQService rabbitmqService = RabbitMQServiceFactory.createService(); @@ -45,7 +43,7 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest { private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import"; private RabbitMQClient rabbitMQClient; - private int received; + private String topicName; private final int expect = 10; @Override @@ -55,55 +53,43 @@ public class RabbitMQSourceITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - received = 0; + topicName = getTopicForTest(this); rabbitMQClient = new RabbitMQClient(rabbitmqService.getAmqpUrl()); - } - - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - return true; - } - - public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); + } + @Override + protected void produceTestData() { for (int i = 0; i < expect; i++) { rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message"); } + } - LOG.debug("Creating the kafka consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the kafka consumer ..."); - + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + @Test @Timeout(90) public void testSource() throws ExecutionException, InterruptedException { ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory .basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withUrl("") - .append("username", rabbitmqService.connectionProperties().username()) - .append("password", rabbitmqService.connectionProperties().password()) - .append("autoDelete", "false") - .append("queue", DEFAULT_RABBITMQ_QUEUE) - .append("skipExchangeDeclare", "true") - .append("skipQueueBind", "true") - .append("hostname", rabbitmqService.connectionProperties().hostname()) - .append("portNumber", rabbitmqService.connectionProperties().port()) - .buildUrl(); - - runBasicStringTest(factory); + .append("username", rabbitmqService.connectionProperties().username()) + .append("password", rabbitmqService.connectionProperties().password()) + .append("autoDelete", "false") + .append("queue", DEFAULT_RABBITMQ_QUEUE) + .append("skipExchangeDeclare", "true") + .append("skipQueueBind", "true") + .append("hostname", rabbitmqService.connectionProperties().hostname()) + .append("portNumber", rabbitmqService.connectionProperties().port()) + .buildUrl(); + + runTest(factory, topicName, expect); } }