This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d325b00  Adds support for running tests against external ElasticSearch 
instances
     new a3da701  Merge pull request #93 from orpiske/decouple-elasticsearch
d325b00 is described below

commit d325b00de41610c05e7b0345bfc69df03ebda6e8
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri Feb 14 18:09:00 2020 +0100

    Adds support for running tests against external ElasticSearch instances
---
 .../clients/elasticsearch/ElasticSearchClient.java |  4 +-
 .../ElasticSearchLocalContainerService.java        | 60 ++++++++++++++++++++++
 .../elasticsearch/ElasticSearchService.java        | 40 +++++++++++++++
 .../elasticsearch/ElasticSearchServiceFactory.java | 44 ++++++++++++++++
 .../elasticsearch/RemoteElasticSearchService.java  | 54 +++++++++++++++++++
 .../CamelSinkElasticSearchITCase.java              | 35 +++++--------
 6 files changed, 213 insertions(+), 24 deletions(-)

diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
index 41f0179..53944a4 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/elasticsearch/ElasticSearchClient.java
@@ -41,10 +41,10 @@ public class ElasticSearchClient {
     private final RestHighLevelClient client;
     private final String index;
 
-    public ElasticSearchClient(int port, String index) {
+    public ElasticSearchClient(String host, int port, String index) {
         client = new RestHighLevelClient(
                 RestClient.builder(
-                        new HttpHost("localhost", port, "http")));
+                        new HttpHost(host, port, "http")));
 
         this.index = index;
     }
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java
new file mode 100644
index 0000000..00a6b30
--- /dev/null
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchLocalContainerService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.elasticsearch;
+
+import org.apache.camel.kafkaconnector.TestCommon;
+import 
org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearchLocalContainerService implements 
ElasticSearchService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticSearchLocalContainerService.class);
+    private static final String DEFAULT_ELASTIC_SEARCH_CONTAINER = 
"docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2";
+    private static final int ELASTIC_SEARCH_PORT = 9200;
+
+    public ElasticsearchContainer container;
+
+    public ElasticSearchLocalContainerService() {
+        String containerName = System.getProperty("elasticsearch.container");
+
+        if (containerName == null || containerName.isEmpty()) {
+            containerName = DEFAULT_ELASTIC_SEARCH_CONTAINER;
+        }
+
+        container = new ElasticsearchContainer(containerName);
+        container.start();
+    }
+
+
+    @Override
+    public String getHttpHostAddress() {
+        return container.getHttpHostAddress();
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("ElasticSearch instance running at {}", getHttpHostAddress());
+    }
+
+    @Override
+    public ElasticSearchClient getClient() {
+        return new ElasticSearchClient("localhost", 
container.getMappedPort(ELASTIC_SEARCH_PORT),
+                TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
+    }
+}
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java
new file mode 100644
index 0000000..6ce24b5
--- /dev/null
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.elasticsearch;
+
+import 
org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface ElasticSearchService extends BeforeAllCallback {
+
+    String getHttpHostAddress();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+
+    ElasticSearchClient getClient();
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception 
{
+        initialize();
+    }
+}
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java
new file mode 100644
index 0000000..4a56893
--- /dev/null
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/ElasticSearchServiceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.elasticsearch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ElasticSearchServiceFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticSearchServiceFactory.class);
+
+    private ElasticSearchServiceFactory() {
+
+    }
+
+    public static ElasticSearchService createService() {
+        String instanceType = 
System.getProperty("elasticsearch.instance.type");
+
+        if (instanceType == null || 
instanceType.equals("local-elasticsearch-container")) {
+            return new ElasticSearchLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RemoteElasticSearchService();
+        }
+
+        LOG.error("Cassandra instance must be one of 
'local-elasticsearch-container' or 'remote");
+        throw new UnsupportedOperationException("Invalid ElasticSearch 
instance type:");
+    }
+}
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java
new file mode 100644
index 0000000..666ad8d
--- /dev/null
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/elasticsearch/RemoteElasticSearchService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.elasticsearch;
+
+import org.apache.camel.kafkaconnector.TestCommon;
+import 
org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
+
+public class RemoteElasticSearchService implements ElasticSearchService {
+    private static final int ELASTIC_SEARCH_PORT = 9200;
+
+    private int getPort() {
+        String strPort = System.getProperty("elasticsearch.port");
+
+        if (strPort != null) {
+            return Integer.parseInt(strPort);
+        }
+
+        return ELASTIC_SEARCH_PORT;
+    }
+
+    private String getHost() {
+        return System.getProperty("elasticsearch.host");
+    }
+
+    @Override
+    public String getHttpHostAddress() {
+        return getHost() + ":" + getPort();
+    }
+
+    @Override
+    public void initialize() {
+        // NO-OP
+    }
+
+    @Override
+    public ElasticSearchClient getClient() {
+        return new ElasticSearchClient(getHost(), getPort(), 
TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
+    }
+}
diff --git 
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
 
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
index 5f1858a..5a34643 100644
--- 
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
+++ 
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/elasticsearch/CamelSinkElasticSearchITCase.java
@@ -27,18 +27,20 @@ import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.TestCommon;
 import 
org.apache.camel.kafkaconnector.clients.elasticsearch.ElasticSearchClient;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import 
org.apache.camel.kafkaconnector.services.elasticsearch.ElasticSearchService;
+import 
org.apache.camel.kafkaconnector.services.elasticsearch.ElasticSearchServiceFactory;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -46,13 +48,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 @Testcontainers
 public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelElasticSearchPropertyFactory.class);
-    // This is required in order to use the Open Source one by default
-    private static final String ELASTIC_SEARCH_CONTAINER = 
"docker.elastic.co/elasticsearch/elasticsearch-oss:7.3.2";
 
-    private static final int ELASTIC_SEARCH_PORT = 9200;
-
-    @Container
-    public ElasticsearchContainer elasticsearch = new 
ElasticsearchContainer(ELASTIC_SEARCH_CONTAINER);
+    @RegisterExtension
+    public ElasticSearchService elasticSearch = 
ElasticSearchServiceFactory.createService();
 
     private ElasticSearchClient client;
 
@@ -62,15 +60,7 @@ public class CamelSinkElasticSearchITCase extends 
AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        final String elasticSearchInstance = elasticsearch
-                .getHttpHostAddress();
-
-        LOG.info("ElasticSearch instance running at {}", 
elasticSearchInstance);
-
-
-
-        client = new 
ElasticSearchClient(elasticsearch.getMappedPort(ELASTIC_SEARCH_PORT),
-                TestCommon.DEFAULT_ELASTICSEARCH_INDEX);
+        client = elasticSearch.getClient();
     }
 
     private void putRecords(CountDownLatch latch) {
@@ -111,13 +101,10 @@ public class CamelSinkElasticSearchITCase extends 
AbstractKafkaTest {
     @Timeout(90)
     public void testIndexOperation() {
         try {
-            final String elasticSearchInstance = elasticsearch
-                    .getHttpHostAddress();
-
             String topic = TestCommon.getDefaultTestTopic(this.getClass());
             CamelElasticSearchPropertyFactory testProperties = new 
CamelElasticSearchIndexPropertyFactory(1, topic,
                     TestCommon.DEFAULT_ELASTICSEARCH_CLUSTER,
-                    elasticSearchInstance, 
TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey);
+                    elasticSearch.getHttpHostAddress(), 
TestCommon.DEFAULT_ELASTICSEARCH_INDEX, transformKey);
 
             getKafkaConnectService().initializeConnector(testProperties);
 
@@ -125,7 +112,9 @@ public class CamelSinkElasticSearchITCase extends 
AbstractKafkaTest {
             ExecutorService service = Executors.newCachedThreadPool();
             service.submit(() -> putRecords(latch));
 
-            latch.await(30, TimeUnit.SECONDS);
+            if (!latch.await(30, TimeUnit.SECONDS)) {
+                fail("Timed out wait for data to be added to the Kafka 
cluster");
+            }
 
             LOG.debug("Waiting for indices");
 
@@ -136,6 +125,8 @@ public class CamelSinkElasticSearchITCase extends 
AbstractKafkaTest {
 
             SearchHits hits = client.getData();
 
+            assertNotNull(hits);
+
             hits.forEach(this::verifyHit);
             assertEquals(expect, received, "Did not receive the same amount of 
messages sent");
 

Reply via email to