This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 492f19ab336567285a5197b07082d4feeedf4eff
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 11:27:01 2021 +0100

    Convert the Azure storage queue tests to the new reusable sink test base 
class
---
 .../sink/CamelSinkAzureStorageQueueITCase.java     | 82 ++++++++++------------
 1 file changed, 38 insertions(+), 44 deletions(-)

diff --git 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
index a7c16ed..d447703 100644
--- 
a/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
+++ 
b/tests/itests-azure-storage-queue/src/test/java/org/apache/camel/kafkaconnector/azure/storage/queue/sink/CamelSinkAzureStorageQueueITCase.java
@@ -17,20 +17,17 @@
 
 package org.apache.camel.kafkaconnector.azure.storage.queue.sink;
 
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.azure.storage.queue.QueueClient;
 import com.azure.storage.queue.QueueServiceClient;
 import com.azure.storage.queue.models.PeekedMessageItem;
-import org.apache.camel.kafkaconnector.CamelSinkTask;
 import 
org.apache.camel.kafkaconnector.azure.storage.queue.common.TestQueueConfiguration;
 import 
org.apache.camel.kafkaconnector.azure.storage.services.AzureStorageClientUtils;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.azure.common.AzureCredentialsHolder;
 import org.apache.camel.test.infra.azure.common.services.AzureService;
@@ -45,9 +42,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkAzureStorageQueueITCase extends AbstractKafkaTest {
+public class CamelSinkAzureStorageQueueITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static AzureService service = 
AzureStorageQueueServiceFactory.createAzureService();
 
@@ -56,6 +54,7 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
     private QueueServiceClient client;
     private QueueClient queueClient;
     private String queueName;
+    private String topicName;
     private int expect = 10;
     private int received;
 
@@ -66,6 +65,8 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
+        topicName = getTopicForTest(this);
+
         client = AzureStorageClientUtils.getClient();
         queueName = "test-queue" + TestUtils.randomWithRange(0, 100);
 
@@ -80,6 +81,30 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
         }
     }
 
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        return null;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            consume();
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(120, TimeUnit.SECONDS)) {
+            assertEquals(expect, received,
+                    "Didn't process the expected amount of messages: " + 
received + " != " + expect);
+        } else {
+            fail("Failed to receive the messages within the specified time");
+        }
+    }
+
     private void acknowledgeReceived(PeekedMessageItem peekedMessageItem) {
         received++;
         LOG.info("Received: {}", peekedMessageItem.getMessageText());
@@ -97,72 +122,41 @@ public class CamelSinkAzureStorageQueueITCase extends 
AbstractKafkaTest {
         int count = queueClient.getProperties().getApproximateMessagesCount();
 
         queueClient.peekMessages(count, null, 
null).forEach(this::acknowledgeReceived);
-
-    }
-
-    private void putRecords() {
-        Map<String, String> messageParameters = new HashMap<>();
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            try {
-                // This is for 3.4 only. From 3.5 and newer, the text is taken 
from the body
-                messageParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAzureStorageQueueMessageText", "test " + i);
-
-                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + 
i, messageParameters);
-            } catch (ExecutionException e) {
-                LOG.error("Unable to produce messages: {}", e.getMessage(), e);
-            } catch (InterruptedException e) {
-                break;
-            }
-        }
-    }
-
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
-
-        putRecords();
-
-        consume();
-
-        assertEquals(expect, received, "Did not receive the same amount of 
messages that were sent");
     }
 
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() throws InterruptedException, 
ExecutionException, IOException {
+    public void testBasicSendReceive() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = 
service.azureCredentials();
 
         ConnectorPropertyFactory connectorPropertyFactory = 
CamelSinkAzureStorageQueuePropertyFactory
                 .basic()
                 .withConfiguration(TestQueueConfiguration.class.getName())
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withAccessKey(azureCredentialsHolder.accountKey())
                 .withAccountName(azureCredentialsHolder.accountName())
                 .withOperation("sendMessage")
                 .withQueueName(queueName);
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUrl() throws InterruptedException, 
ExecutionException, IOException {
+    public void testBasicSendReceiveUrl() throws Exception {
         AzureCredentialsHolder azureCredentialsHolder = 
service.azureCredentials();
 
         ConnectorPropertyFactory connectorPropertyFactory = 
CamelSinkAzureStorageQueuePropertyFactory
                 .basic()
-                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withTopics(topicName)
                 .withConfiguration(TestQueueConfiguration.class.getName())
                 .withUrl(azureCredentialsHolder.accountName() + "/" + 
queueName)
                 .append("accessKey", azureCredentialsHolder.accountKey())
                 .append("operation", "sendMessage")
                 .buildUrl();
 
-        runTest(connectorPropertyFactory);
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 }

Reply via email to