This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0fa21491c49652a8ba4ef6b863b2429cf9d06318
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Mon Feb 8 18:35:26 2021 +0100

    Converted the Azure storage queue source test case to use the reusable 
source base class
---
 .../source/CamelSourceAzureStorageQueueITCase.java | 57 +++++++---------------
 1 file changed, 18 insertions(+), 39 deletions(-)

diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
index f13eeb1..26faa6e 100644
--- 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/source/CamelSourceAzureStorageQueueITCase.java
@@ -24,14 +24,13 @@ import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
 import 
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import 
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
 import 
org.apache.camel.test.infra.azure.storage.queue.services.AzureStorageQueueServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -39,23 +38,20 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Disabled(value = "Disabled due to issue #976")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSourceAzureStorageQueueITCase extends CamelSourceTestSupport 
{
     @RegisterExtension
     public static AzureService service = 
AzureStorageQueueServiceFactory.createAzureService();
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceAzureStorageQueueITCase.class);
 
     private QueueServiceClient client;
     private QueueClient queueClient;
     private String queueName;
+    private String topicName;
     private int expect = 10;
-    private int received;
 
     @Override
     protected String[] getConnectorsInTest() {
@@ -64,11 +60,11 @@ public class CamelSourceAzureStorageQueueITCase extends 
AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
         client = AzureStorageClientUtils.getClient();
         queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
 
         queueClient = client.createQueue(queueName);
-        received = 0;
     }
 
     @AfterEach
@@ -78,39 +74,22 @@ public class CamelSourceAzureStorageQueueITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void sendMessages() {
-        for (int i = 0; i < expect; i++) {
-            queueClient.sendMessage("Test message " + i);
-        }
-    }
-
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    @Override
+    protected void produceTestData() {
         sendMessages();
+    }
 
-        LOG.debug("Initialized the connector and put the data for the test 
execution");
-
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of 
messages");
     }
 
+    private void sendMessages() {
+        for (int i = 0; i < expect; i++) {
+            queueClient.sendMessage("Test message " + i);
+        }
+    }
 
     @Test
     @Timeout(90)
@@ -120,12 +99,12 @@ public class CamelSourceAzureStorageQueueITCase extends 
AbstractKafkaTest {
         ConnectorPropertyFactory connectorPropertyFactory = 
CamelSourceAzureStorageQueuePropertyFactory
                 .basic()
                 .withConfiguration(TestQueueConfiguration.class.getName())
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withKafkaTopic(topicName)
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withQueueName(queueName);
 
-        runtTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 }

Reply via email to