This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 8ea604c25e72a59eb7f58194264e6b8266ad5acc Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 11:58:20 2021 +0100 Convert the Couchbase tests to the new reusable sink test base class --- .../couchbase/sink/CamelSinkCouchbaseITCase.java | 100 +++++++++------------ 1 file changed, 43 insertions(+), 57 deletions(-) diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java index caacfab..15104ac 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java @@ -21,9 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.couchbase.client.core.diagnostics.EndpointPingReport; @@ -35,9 +32,8 @@ import com.couchbase.client.java.json.JsonObject; import com.couchbase.client.java.manager.bucket.BucketSettings; import com.couchbase.client.java.query.QueryResult; import org.apache.camel.kafkaconnector.CamelSinkTask; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.couchbase.services.CouchbaseService; import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory; @@ -54,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /* @@ -66,7 +61,7 @@ import static org.junit.jupiter.api.Assertions.fail; */ @EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { +public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { @RegisterExtension public static CouchbaseService service = CouchbaseServiceFactory.getService(); @@ -99,7 +94,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { LOG.debug("Bucket created"); - topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100); + topic = getTopicForTest(this); try { String startDelay = System.getProperty("couchbase.test.start.delay", "1000"); @@ -111,18 +106,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { } } - private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) { - entries.getValue().forEach(this::checkStatus); - } - - private void checkStatus(EndpointPingReport endpointPingReport) { - if (endpointPingReport.state() == PingState.OK) { - LOG.debug("Endpoint {} is ok", endpointPingReport.id()); - } else { - LOG.warn("Endpoint {} is not OK", endpointPingReport.id()); - } - } - @AfterEach public void tearDown() { LOG.debug("Dropping the test bucket named {}", bucketName); @@ -132,28 +115,51 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { cluster.disconnect(); } - private void produceMessages(CountDownLatch latch) { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + @Override + protected String testMessageContent(int current) { + JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", current)); - try { - for (int i = 0; i < expect; i++) { - Map<String, String> parameters = new HashMap<>(); + return jsonObject.toString(); + } - parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(i)); + @Override + protected Map<String, String> messageHeaders(String text, int current) { + Map<String, String> parameters = new HashMap<>(); - JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", i)); + parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(current)); - try { - kafkaClient.produce(topic, jsonObject.toString(), parameters); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; - } - } + return parameters; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + TestUtils.waitFor(this::waitForMinimumRecordCount); } finally { latch.countDown(); } + + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + verifyRecords(); + } else { + fail("Failed to receive the records within the specified time"); + } + } + + private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) { + entries.getValue().forEach(this::checkStatus); + } + + private void checkStatus(EndpointPingReport endpointPingReport) { + if (endpointPingReport.state() == PingState.OK) { + LOG.debug("Endpoint {} is ok", endpointPingReport.id()); + } else { + LOG.warn("Endpoint {} is not OK", endpointPingReport.id()); + } } private boolean waitForMinimumRecordCount() { @@ -191,26 +197,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { LOG.debug("Received record: {}", results.get(0)); } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - - LOG.debug("Creating the producer and sending messages ..."); - ExecutorService service = Executors.newCachedThreadPool(); - - CountDownLatch latch = new CountDownLatch(1); - service.submit(() -> produceMessages(latch)); - - assertTrue(TestUtils.waitFor(this::waitForMinimumRecordCount)); - - LOG.debug("Waiting for the test to complete"); - if (latch.await(110, TimeUnit.SECONDS)) { - verifyRecords(); - } else { - fail("Failed to receive the records within the specified time"); - } - } - @Disabled("Not formatting the URL correctly - issue #629") @Test @Timeout(90) @@ -224,7 +210,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { .withUsername(service.getUsername()) .withPassword(service.getPassword()); - runTest(factory); + runTest(factory, topic, expect); } @RepeatedTest(10) @@ -243,6 +229,6 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest { .buildUrl(); - runTest(factory); + runTest(factory, topic, expect); } }