This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 79a5f91e52c023affdaff1121c142113a0fda583 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 11:33:28 2021 +0100 Convert the Cassandra tests to the new reusable sink test base class --- .../cassandra/sink/CamelSinkCassandraITCase.java | 83 +++++++++------------- 1 file changed, 33 insertions(+), 50 deletions(-) diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java index 56a0930..2949fff 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java @@ -17,17 +17,14 @@ package org.apache.camel.kafkaconnector.cassandra.sink; +import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.cassandra.services.CassandraService; import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory; @@ -40,11 +37,11 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkCassandraITCase extends AbstractKafkaTest { +public class CamelSinkCassandraITCase extends CamelSinkTestSupport { @RegisterExtension public static CassandraService cassandraService = CassandraServiceFactory.createService(); @@ -52,6 +49,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { private CassandraClient cassandraClient; private TestDataDao testDataDao; + private String topicName; private final int expect = 10; private int received; @@ -63,6 +61,7 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); cassandraClient = new CassandraClient(cassandraService.getCassandraHost(), cassandraService.getCQL3Port()); testDataDao = cassandraClient.newTestDataDao(); @@ -70,6 +69,8 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { testDataDao.createKeySpace(); testDataDao.useKeySpace(); testDataDao.createTable(); + + received = 0; } @AfterEach @@ -83,81 +84,63 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest { } } - private void putRecords(CountDownLatch latch) { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + @Override + protected void consumeMessages(CountDownLatch latch) { try { - for (int i = 0; i < expect; i++) { - try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test " + i); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; - } + if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) { + fail("Did not receive enough data"); } + testDataDao.getData(this::checkRetrievedData); } finally { latch.countDown(); } } - private void checkRetrievedData(String data) { - if (data != null) { - received++; + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals(expect, received, + "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); } } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(() -> putRecords(latch)); - - if (!latch.await(30, TimeUnit.SECONDS)) { - fail("Timed out wait for data to be added to the Kafka cluster"); - } - - if (!TestUtils.waitFor(testDataDao::hasEnoughData, (long) expect)) { - fail("Did not receive enough data"); + private void checkRetrievedData(String data) { + if (data != null) { + received++; } - testDataDao.getData(this::checkRetrievedData); - assertTrue(received >= expect, - String.format("Did not receive as much data as expected: %d < %d", received, expect)); - } @Timeout(90) @Test - public void testFetchFromCassandra() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - + public void testFetchFromCassandra() throws Exception { ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory .basic() - .withTopics(topic) + .withTopics(topicName) .withHosts(cassandraService.getCassandraHost()) .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) .withCql(testDataDao.getInsertStatement()); - runTest(connectorPropertyFactory); + runTest(connectorPropertyFactory, topicName, expect); } @Timeout(90) @Test - public void testFetchFromCassandraWithUrl() throws ExecutionException, InterruptedException { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - + public void testFetchFromCassandraWithUrl() throws Exception { ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory .basic() - .withTopics(topic) + .withTopics(topicName) .withUrl(cassandraService.getCQL3Endpoint(), TestDataDao.KEY_SPACE) .append("cql", testDataDao.getInsertStatement()) .buildUrl(); - runTest(connectorPropertyFactory); - + runTest(connectorPropertyFactory, topicName, expect); } }