This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 8ea604c25e72a59eb7f58194264e6b8266ad5acc
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Feb 3 11:58:20 2021 +0100

    Convert the Couchbase tests to the new reusable sink test base class
---
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 100 +++++++++------------
 1 file changed, 43 insertions(+), 57 deletions(-)

diff --git 
a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
 
b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index caacfab..15104ac 100644
--- 
a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ 
b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -21,9 +21,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.couchbase.client.core.diagnostics.EndpointPingReport;
@@ -35,9 +32,8 @@ import com.couchbase.client.java.json.JsonObject;
 import com.couchbase.client.java.manager.bucket.BucketSettings;
 import com.couchbase.client.java.query.QueryResult;
 import org.apache.camel.kafkaconnector.CamelSinkTask;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
 import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
@@ -54,7 +50,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /*
@@ -66,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true")
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
+public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport {
     @RegisterExtension
     public static CouchbaseService service = 
CouchbaseServiceFactory.getService();
 
@@ -99,7 +94,7 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
 
         LOG.debug("Bucket created");
 
-        topic = TestUtils.getDefaultTestTopic(this.getClass()) + 
TestUtils.randomWithRange(0, 100);
+        topic = getTopicForTest(this);
 
         try {
             String startDelay = 
System.getProperty("couchbase.test.start.delay", "1000");
@@ -111,18 +106,6 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void checkEndpoints(Map.Entry<ServiceType, 
List<EndpointPingReport>> entries) {
-        entries.getValue().forEach(this::checkStatus);
-    }
-
-    private void checkStatus(EndpointPingReport endpointPingReport) {
-        if (endpointPingReport.state() == PingState.OK) {
-            LOG.debug("Endpoint {} is ok", endpointPingReport.id());
-        } else {
-            LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
-        }
-    }
-
     @AfterEach
     public void tearDown() {
         LOG.debug("Dropping the test bucket named {}", bucketName);
@@ -132,28 +115,51 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
         cluster.disconnect();
     }
 
-    private void produceMessages(CountDownLatch latch) {
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+    @Override
+    protected String testMessageContent(int current) {
+        JsonObject jsonObject = JsonObject.create().put("data", 
String.format("test-%d", current));
 
-        try {
-            for (int i = 0; i < expect; i++) {
-                Map<String, String> parameters = new HashMap<>();
+        return jsonObject.toString();
+    }
 
-                parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", 
String.valueOf(i));
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> parameters = new HashMap<>();
 
-                JsonObject jsonObject = JsonObject.create().put("data", 
String.format("test-%d", i));
+        parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", 
String.valueOf(current));
 
-                try {
-                    kafkaClient.produce(topic, jsonObject.toString(), 
parameters);
-                } catch (ExecutionException e) {
-                    LOG.error("Unable to produce messages: {}", 
e.getMessage(), e);
-                } catch (InterruptedException e) {
-                    break;
-                }
-            }
+        return parameters;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            TestUtils.waitFor(this::waitForMinimumRecordCount);
         } finally {
             latch.countDown();
         }
+
+    }
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            verifyRecords();
+        } else {
+            fail("Failed to receive the records within the specified time");
+        }
+    }
+
+    private void checkEndpoints(Map.Entry<ServiceType, 
List<EndpointPingReport>> entries) {
+        entries.getValue().forEach(this::checkStatus);
+    }
+
+    private void checkStatus(EndpointPingReport endpointPingReport) {
+        if (endpointPingReport.state() == PingState.OK) {
+            LOG.debug("Endpoint {} is ok", endpointPingReport.id());
+        } else {
+            LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
+        }
     }
 
     private boolean waitForMinimumRecordCount() {
@@ -191,26 +197,6 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
         LOG.debug("Received record: {}", results.get(0));
     }
 
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws Exception {
-        connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
-
-        LOG.debug("Creating the producer and sending messages ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> produceMessages(latch));
-
-        assertTrue(TestUtils.waitFor(this::waitForMinimumRecordCount));
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            verifyRecords();
-        } else {
-            fail("Failed to receive the records within the specified time");
-        }
-    }
-
     @Disabled("Not formatting the URL correctly - issue #629")
     @Test
     @Timeout(90)
@@ -224,7 +210,7 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
                 .withUsername(service.getUsername())
                 .withPassword(service.getPassword());
 
-        runTest(factory);
+        runTest(factory, topic, expect);
     }
 
     @RepeatedTest(10)
@@ -243,6 +229,6 @@ public class CamelSinkCouchbaseITCase extends 
AbstractKafkaTest {
                     .buildUrl();
 
 
-        runTest(factory);
+        runTest(factory, topic, expect);
     }
 }

Reply via email to