This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 0fa21491c49652a8ba4ef6b863b2429cf9d06318 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 18:35:26 2021 +0100 Converted the Azure storage queue source test case to use the reusable source base class --- .../source/CamelSourceAzureStorageQueueITCase.java | 57 +++++++--------------- 1 file changed, 18 insertions(+), 39 deletions(-) diff --git a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java index f13eeb1..26faa6e 100644 --- a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java +++ b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java @@ -24,14 +24,13 @@ import com.azure.storage.queue.QueueClient; import com.azure.storage.queue.QueueServiceClient; import org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration; import org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder; import org.apache.camel.test.infra.azure.common.services.AzureService; import org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -39,23 +38,20 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @Disabled(value = "Disabled due to issue #976") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest { +public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport { @RegisterExtension public static AzureService service = AzureStorageQueueServiceFactory.createAzureService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAzureStorageQueueITCase.class); private QueueServiceClient client; private QueueClient queueClient; private String queueName; + private String topicName; private int expect = 10; - private int received; @Override protected String[] getConnectorsInTest() { @@ -64,11 +60,11 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); client = AzureStorageClientUtils.getClient(); queueName = "test-queue" + TestUtils.randomWithRange(0, 100); queueClient = client.createQueue(queueName); - received = 0; } @AfterEach @@ -78,39 +74,22 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest { } } - private void sendMessages() { - for (int i = 0; i < expect; i++) { - queueClient.sendMessage("Test message " + i); - } - } - - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - + @Override + protected void produceTestData() { sendMessages(); + } - LOG.debug("Initialized the connector and put the data for the test execution"); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } + private void sendMessages() { + for (int i = 0; i < expect; i++) { + queueClient.sendMessage("Test message " + i); + } + } @Test @Timeout(90) @@ -120,12 +99,12 @@ public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest { ConnectorPropertyFactory connectorPropertyFactory = CamelSourceAzureStorageQueuePropertyFactory .basic() .withConfiguration(TestQueueConfiguration.class.getName()) - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withAccessKey(azureCredentialsHolder.accountKey()) .withAccountName(azureCredentialsHolder.accountName()) .withQueueName(queueName); - runtTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } }