This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6e7f97b852a81764ce71f359d19bb5f39a24fa0 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Mon Feb 8 20:00:03 2021 +0100 Converted the MongoDB source test case to use the reusable source base class --- .../mongodb/source/CamelSourceMongoDBITCase.java | 49 +++++++++------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java index 9260f05..5c4fa5d 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java @@ -26,46 +26,41 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.CreateCollectionOptions; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; +import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.test.infra.mongodb.services.MongoDBService; import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.bson.Document; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; @Disabled(value = "Disabled due to issue #974") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSourceMongoDBITCase extends AbstractKafkaTest { +public class CamelSourceMongoDBITCase extends CamelSourceTestSupport { @RegisterExtension public static MongoDBService mongoDBService = MongoDBServiceFactory.createService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceMongoDBITCase.class); - private MongoClient mongoClient; + private String topicName; private final int expect = 10; - private int received; @Override protected String[] getConnectorsInTest() { return new String[]{"camel-mongodb-kafka-connector"}; } - @BeforeEach - public void setUp() { + @BeforeAll + public void setUpDb() { mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl()); MongoDatabase database = mongoClient.getDatabase("testDatabase"); @@ -95,25 +90,19 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest { collection.insertMany(documents); } - private <T> boolean checkRecord(ConsumerRecord<String, T> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; + @BeforeEach + public void setUp() { + topicName = getTopicForTest(this); } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + @Override + protected void produceTestData() { + // NO-OP: static data already produced on the DB setup method + } - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); + @Override + protected void verifyMessages(TestMessageConsumer<?> consumer) { + int received = consumer.consumedMessages().size(); assertEquals(received, expect, "Didn't process the expected amount of messages"); } @@ -125,13 +114,13 @@ public class CamelSourceMongoDBITCase extends AbstractKafkaTest { mongoDBService.getReplicaSetUrl()); ConnectorPropertyFactory factory = CamelMongoDBPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) + .withKafkaTopic(topicName) .withConnectionBean("mongo", BasicConnectorPropertyFactory.classRef(connectionBeanRef)) .withDatabase("testDatabase") .withCollection("testCollection") .withCreateCollection(true); - runTest(factory); + runTest(factory, topicName, expect); } }