This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 34dec6e33b7b6324fb1b6d57a400e6f47cebbe50 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Feb 3 12:07:53 2021 +0100 Convert the ElasticSearch tests to the new reusable sink test base class --- .../sink/CamelSinkElasticSearchITCase.java | 159 ++++++++------------- 1 file changed, 60 insertions(+), 99 deletions(-) diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java index 8358aac..c80f892 100644 --- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java +++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java @@ -17,16 +17,12 @@ package org.apache.camel.kafkaconnector.elasticsearch.sink; +import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; import org.apache.camel.kafkaconnector.elasticsearch.clients.ElasticSearchClient; import org.apache.camel.kafkaconnector.elasticsearch.common.ElasticSearchCommon; import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchService; @@ -41,26 +37,28 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", disabledReason = "Hangs when running with the embedded Kafka Connect instance") -public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { +public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport { @RegisterExtension public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService(); private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class); private ElasticSearchClient client; + private String topicName; private final int expect = 10; private int received; private final String transformKey = "index-test"; + @Override protected String[] getConnectorsInTest() { return new String[] {"camel-elasticsearch-rest-kafka-connector"}; @@ -68,32 +66,45 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { + topicName = getTopicForTest(this); client = new ElasticSearchClient(elasticSearch.getElasticSearchHost(), elasticSearch.getPort(), ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX); received = 0; } - private void putRecords(CountDownLatch latch) { - LOG.debug("Sending records to Kafka"); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + @Override + protected void consumeMessages(CountDownLatch latch) { try { - for (int i = 0; i < expect; i++) { - try { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "test"); - } catch (ExecutionException e) { - LOG.error("Unable to produce messages: {}", e.getMessage(), e); - } catch (InterruptedException e) { - break; - } - } + client.waitForIndex(); + + LOG.debug("Waiting for data"); + client.waitForData(expect); } finally { latch.countDown(); } } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(30, TimeUnit.SECONDS)) { + SearchHits hits = client.getData(); + assertNotNull(hits); + + hits.forEach(this::verifyHit); + assertEquals(expect, received, + "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + private void verifyHit(SearchHit searchHit) { String source = searchHit.getSourceAsString(); @@ -107,91 +118,41 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { received++; } - public void runTest(ConnectorPropertyFactory propertyFactory) throws ExecutionException, InterruptedException { - propertyFactory.log(); - LOG.debug("Performing initialization"); - getKafkaConnectService().initializeConnector(propertyFactory); - - LOG.debug("Initialization complete"); - CountDownLatch latch = new CountDownLatch(1); - ExecutorService service = Executors.newCachedThreadPool(); - service.submit(() -> putRecords(latch)); - - LOG.debug("Waiting for records"); - if (!latch.await(30, TimeUnit.SECONDS)) { - fail("Timed out wait for data to be added to the Kafka cluster"); - } - - LOG.debug("Waiting for indices"); - - client.waitForIndex(); - - LOG.debug("Waiting for data"); - client.waitForData(expect); - - SearchHits hits = client.getData(); - - assertNotNull(hits); - - hits.forEach(this::verifyHit); - assertEquals(expect, received, "Did not receive the same amount of messages sent"); - - LOG.debug("Created the consumer ... About to receive messages"); - } - @Test @Timeout(90) - public void testIndexOperation() { - try { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory - .basic() - .withTopics(topic) - .withOperation("Index") - .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER) - .withHostAddress(elasticSearch.getHttpHostAddress()) - .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX) - .withTransformsConfig("ElasticSearchTransforms") - .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms") - .withEntry("key", transformKey) - .end(); - - runTest(propertyFactory); - - LOG.debug("Created the consumer ... About to receive messages"); - } catch (Exception e) { - LOG.error("ElasticSearch test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testIndexOperation() throws Exception { + ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory + .basic() + .withTopics(topicName) + .withOperation("Index") + .withClusterName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER) + .withHostAddress(elasticSearch.getHttpHostAddress()) + .withIndexName(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX) + .withTransformsConfig("ElasticSearchTransforms") + .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms") + .withEntry("key", transformKey) + .end(); + + runTest(propertyFactory, topicName, expect); } @Test @Timeout(90) - public void testIndexOperationUsingUrl() { - try { - String topic = TestUtils.getDefaultTestTopic(this.getClass()); - - ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory - .basic() - .withTopics(topic) - .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER) - .append("hostAddresses", elasticSearch.getHttpHostAddress()) - .append("operation", "Index") - .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX) - .buildUrl() - .withTransformsConfig("ElasticSearchTransforms") - .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms") - .withEntry("key", transformKey) - .end(); - - runTest(propertyFactory); - - LOG.debug("Created the consumer ... About to receive messages"); - } catch (Exception e) { - LOG.error("ElasticSearch test failed: {}", e.getMessage(), e); - fail(e.getMessage()); - } + public void testIndexOperationUsingUrl() throws Exception { + ConnectorPropertyFactory propertyFactory = CamelElasticSearchPropertyFactory + .basic() + .withTopics(topicName) + .withUrl(ElasticSearchCommon.DEFAULT_ELASTICSEARCH_CLUSTER) + .append("hostAddresses", elasticSearch.getHttpHostAddress()) + .append("operation", "Index") + .append("indexName", ElasticSearchCommon.DEFAULT_ELASTICSEARCH_INDEX) + .buildUrl() + .withTransformsConfig("ElasticSearchTransforms") + .withEntry("type", "org.apache.camel.kafkaconnector.elasticsearchrest.transformers.ConnectRecordValueToMapTransforms") + .withEntry("key", transformKey) + .end(); + + runTest(propertyFactory, topicName, expect); } }