This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d325b00 Adds support for running tests against external ElasticSearch instances new a3da701 Merge pull request #93 from orpiske/decouple-elasticsearch d325b00 is described below commit d325b00de41610c05e7b0345bfc69df03ebda6e8 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Feb 14 18:09:00 2020 +0100 Adds support for running tests against external ElasticSearch instances --- .../clients/elasticsearch/ElasticSearchClient.java | 4 +- .../ElasticSearchLocalContainerService.java | 60 ++++++++++++++++++++++ .../elasticsearch/ElasticSearchService.java | 40 +++++++++++++++ .../elasticsearch/ElasticSearchServiceFactory.java | 44 ++++++++++++++++ .../elasticsearch/RemoteElasticSearchService.java | 54 +++++++++++++++++++ .../CamelSinkElasticSearchITCase.java | 35 +++++-------- 6 files changed, 213 insertions(+), 24 deletions(-) diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java index 41f0179..53944a4 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java @@ -41,10 +41,10 @@ public class ElasticSearchClient { private final RestHighLevelClient client; private final String index; - public ElasticSearchClient(int port, String index) { + public ElasticSearchClient(String host, int port, String index) { client = new RestHighLevelClient( RestClient.builder( - new HttpHost("localhost", port, "http"))); + new HttpHost(host, port, "http"))); this.index = index; } diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java new file mode 100644 index 0000000..00a6b30 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.elasticsearch; + +import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.elasticsearch.ElasticsearchContainer; + +public class ElasticSearchLocalContainerService implements ElasticSearchService { + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchLocalContainerService.class); + private static final String DEFAULT_ELASTIC_SEARCH_CONTAINER = "docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2"; + private static final int ELASTIC_SEARCH_PORT = 9200; + + public ElasticsearchContainer container; + + public ElasticSearchLocalContainerService() { + String containerName = System.getProperty("elasticsearch.container"); + + if (containerName == null || containerName.isEmpty()) { + containerName = DEFAULT_ELASTIC_SEARCH_CONTAINER; + } + + container = new ElasticsearchContainer(containerName); + container.start(); + } + + + @Override + public String getHttpHostAddress() { + return container.getHttpHostAddress(); + } + + @Override + public void initialize() { + LOG.info("ElasticSearch instance running at {}", getHttpHostAddress()); + } + + @Override + public ElasticSearchClient getClient() { + return new ElasticSearchClient("localhost", container.getMappedPort(ELASTIC_SEARCH_PORT), + TestCommon.DEFAULT_ELASTICSEARCH_INDEX); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java new file mode 100644 index 0000000..6ce24b5 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.elasticsearch; + +import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public interface ElasticSearchService extends BeforeAllCallback { + + String getHttpHostAddress(); + + /** + * Perform any initialization necessary + */ + void initialize(); + + + ElasticSearchClient getClient(); + + @Override + default void beforeAll(ExtensionContext extensionContext) throws Exception { + initialize(); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java new file mode 100644 index 0000000..4a56893 --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.elasticsearch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ElasticSearchServiceFactory { + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchServiceFactory.class); + + private ElasticSearchServiceFactory() { + + } + + public static ElasticSearchService createService() { + String instanceType = System.getProperty("elasticsearch.instance.type"); + + if (instanceType == null || instanceType.equals("local-elasticsearch-container")) { + return new ElasticSearchLocalContainerService(); + } + + if (instanceType.equals("remote")) { + return new RemoteElasticSearchService(); + } + + LOG.error("Cassandra instance must be one of 'local-elasticsearch-container' or 'remote"); + throw new UnsupportedOperationException("Invalid ElasticSearch instance type:"); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java new file mode 100644 index 0000000..666ad8d --- /dev/null +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.services.elasticsearch; + +import org.apache.camel.kafkaconnector.TestCommon; +import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient; + +public class RemoteElasticSearchService implements ElasticSearchService { + private static final int ELASTIC_SEARCH_PORT = 9200; + + private int getPort() { + String strPort = System.getProperty("elasticsearch.port"); + + if (strPort != null) { + return Integer.parseInt(strPort); + } + + return ELASTIC_SEARCH_PORT; + } + + private String getHost() { + return System.getProperty("elasticsearch.host"); + } + + @Override + public String getHttpHostAddress() { + return getHost() + ":" + getPort(); + } + + @Override + public void initialize() { + // NO-OP + } + + @Override + public ElasticSearchClient getClient() { + return new ElasticSearchClient(getHost(), getPort(), TestCommon.DEFAULT_ELASTICSEARCH_INDEX); + } +} diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java index 5f1858a..5a34643 100644 --- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java +++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java @@ -27,18 +27,20 @@ import org.apache.camel.kafkaconnector.AbstractKafkaTest; import org.apache.camel.kafkaconnector.TestCommon; import org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient; import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.services.elasticsearch.ElasticSearchService; +import org.apache.camel.kafkaconnector.services.elasticsearch.ElasticSearchServiceFactory; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -46,13 +48,9 @@ import static org.junit.jupiter.api.Assertions.fail; @Testcontainers public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class); - // This is required in order to use the Open Source one by default - private static final String ELASTIC_SEARCH_CONTAINER = "docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2"; - private static final int ELASTIC_SEARCH_PORT = 9200; - - @Container - public ElasticsearchContainer elasticsearch = new ElasticsearchContainer(ELASTIC_SEARCH_CONTAINER); + @RegisterExtension + public ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService(); private ElasticSearchClient client; @@ -62,15 +60,7 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { @BeforeEach public void setUp() { - final String elasticSearchInstance = elasticsearch - .getHttpHostAddress(); - - LOG.info("ElasticSearch instance running at {}", elasticSearchInstance); - - - - client = new ElasticSearchClient(elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT), - TestCommon.DEFAULT_ELASTICSEARCH_INDEX); + client = elasticSearch.getClient(); } private void putRecords(CountDownLatch latch) { @@ -111,13 +101,10 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { @Timeout(90) public void testIndexOperation() { try { - final String elasticSearchInstance = elasticsearch - .getHttpHostAddress(); - String topic = TestCommon.getDefaultTestTopic(this.getClass()); CamelElasticSearchPropertyFactory testProperties = new CamelElasticSearchIndexPropertyFactory(1, topic, TestCommon.DEFAULT_ELASTICSEARCH_CLUSTER, - elasticSearchInstance, TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey); + elasticSearch.getHttpHostAddress(), TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey); getKafkaConnectService().initializeConnector(testProperties); @@ -125,7 +112,9 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { ExecutorService service = Executors.newCachedThreadPool(); service.submit(() -> putRecords(latch)); - latch.await(30, TimeUnit.SECONDS); + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("Timed out wait for data to be added to the Kafka cluster"); + } LOG.debug("Waiting for indices"); @@ -136,6 +125,8 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest { SearchHits hits = client.getData(); + assertNotNull(hits); + hits.forEach(this::verifyHit); assertEquals(expect, received, "Did not receive the same amount of messages sent");