This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 5ca2ddf6e168f3331caa0c92da569e2f68be787e Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 13:53:46 2021 +0100 Convert the MongoDB tests to the new reusable sink test base class --- .../mongodb/sink/CamelSinkMongoDBITCase.java | 76 ++++++++++++---------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java index eb4cf2f..da1b02a 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java @@ -17,16 +17,16 @@ package org.apache.camel.kafkaconnector.mongodb.sink; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.mongodb.services.MongoDBService; import org.apache.camel.test.infra.mongodb.services.MongoDBServiceFactory; @@ -43,13 +43,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class CamelSinkMongoDBITCase extends AbstractKafkaTest { +public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { @RegisterExtension public static MongoDBService mongoDBService = MongoDBServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(CamelMongoDBPropertyFactory.class); private MongoClient mongoClient; + private String topicName; + private final String databaseName = "testDB"; + private final String collectionName = "testRecords"; private final int expect = 10; @@ -61,28 +64,44 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl()); } - private void putRecords() { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + @Override + protected String testMessageContent(int current) { + return String.format("{\"test\": \"value %d\"}", current); + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + @Override + protected void consumeMessages(CountDownLatch latch) { try { - for (int i = 0; i < expect; i++) { - String data = String.format("{\"test\": \"value %d\"}", i); - - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), data); - } - - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("The thread putting records to Kafka was interrupted"); - fail("The thread putting records to Kafka was interrupted"); + MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); + MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName); + + LOG.info("Waiting for data on the MongoDB instance"); + TestUtils.waitFor(() -> hasAllRecords(collection)); + } finally { + latch.countDown(); } } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(15, TimeUnit.SECONDS)) { + String databaseName = "testDB"; + String collectionName = "testRecords"; + + verifyDocuments(databaseName, collectionName); + } else { + fail("Failed to receive the messages within the specified time"); + } + } private boolean hasAllRecords(MongoCollection<Document> collection) { return collection.countDocuments() >= expect; } @@ -91,34 +110,23 @@ public class CamelSinkMongoDBITCase extends AbstractKafkaTest { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName); - TestUtils.waitFor(() -> hasAllRecords(collection)); - assertEquals(expect, collection.countDocuments()); } - public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { - propertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(propertyFactory, 1); - - putRecords(); - } - @Test - @Timeout(90) - public void testBasicSendReceive() throws ExecutionException, InterruptedException { + @Timeout(30) + public void testBasicSendReceive() throws Exception { String connectionBeanRef = String.format("com.mongodb.client.MongoClients#create('%s')", mongoDBService.getReplicaSetUrl()); CamelMongoDBPropertyFactory factory = CamelMongoDBPropertyFactory.basic() - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withTopics(topicName) .withConnectionBean("mongo", BasicConnectorPropertyFactory.classRef(connectionBeanRef)) .withDatabase("testDB") .withCollection("testRecords") .withOperation("insert"); - runTest(factory); - - verifyDocuments("testDB", "testRecords"); + runTest(factory, topicName, expect); } }