Updated Branches: refs/heads/master 1bb756cd2 -> 9ecc122b4
CAMEL-6444 added explicity ip/port support to camel-elasticsearch Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9ecc122b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9ecc122b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9ecc122b Branch: refs/heads/master Commit: 9ecc122b45848045a565901772ca1b3888d1cf4a Parents: 1bb756c Author: boday <bo...@apache.org> Authored: Wed Aug 7 07:55:21 2013 -0700 Committer: boday <bo...@apache.org> Committed: Wed Aug 7 07:55:21 2013 -0700 ---------------------------------------------------------------------- .../ElasticsearchConfiguration.java | 27 ++++++++++++++-- .../elasticsearch/ElasticsearchEndpoint.java | 15 +++++++-- .../ElasticsearchComponentTest.java | 33 ++++++++++++++++++-- 3 files changed, 69 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index a07ab0a..8ed17d2 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -19,10 +19,8 @@ package org.apache.camel.component.elasticsearch; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; - import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; - import static org.elasticsearch.node.NodeBuilder.nodeBuilder; public class ElasticsearchConfiguration { @@ -37,6 +35,9 @@ public class ElasticsearchConfiguration { public static final String PARAM_INDEX_TYPE = "indexType"; public static final String PROTOCOL = "elasticsearch"; private static final String LOCAL_NAME = "local"; + private static final String IP = "ip"; + private static final String PORT = "port"; + private static final Integer DEFAULT_PORT = 9300; private URI uri; private String protocolType; @@ -47,6 +48,8 @@ public class ElasticsearchConfiguration { private boolean local; private Boolean data; private String operation; + private String ip; + private Integer port; public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception { String protocol = uri.getScheme(); @@ -82,6 +85,9 @@ public class ElasticsearchConfiguration { indexName = (String)parameters.remove(PARAM_INDEX_NAME); indexType = (String)parameters.remove(PARAM_INDEX_TYPE); operation = (String)parameters.remove(PARAM_OPERATION); + ip = (String)parameters.remove(IP); + String portParam = (String) parameters.remove(PORT); + port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam); } protected Boolean toBoolean(Object string) { @@ -181,4 +187,21 @@ public class ElasticsearchConfiguration { public String getOperation() { return this.operation; } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java index f1d074a..4d002ea 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java @@ -18,13 +18,16 @@ package org.apache.camel.component.elasticsearch; import java.net.URI; import java.util.Map; - import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultEndpoint; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +69,15 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { LOG.info("Joining ElasticSearch cluster " + config.getClusterName()); } node = config.buildNode(); - client = node.client(); + if (config.getIp() != null && !config.isLocal()) { + Settings settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", config.getClusterName()).put("node.client", true).build(); + Client client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress(config.getIp(), config.getPort())); + this.client = client; + } else { + client = node.client(); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/9ecc122b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 2f15011..cab6451 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -18,13 +18,12 @@ package org.apache.camel.component.elasticsearch; import java.util.HashMap; import java.util.Map; - import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; - import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class ElasticsearchComponentTest extends CamelTestSupport { @@ -104,6 +103,34 @@ public class ElasticsearchComponentTest extends CamelTestSupport { } @Test + @Ignore("need to setup the cluster IP for this test") + public void indexWithIp() throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + String indexId = template.requestBodyAndHeaders("direct:indexWithIp", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test + @Ignore("need to setup the cluster IP/Port for this test") + public void indexWithIpAndPort() throws Exception { + Map<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + } + + @Test public void testGetWithHeaders() throws Exception { //first, INDEX a value Map<String, String> map = new HashMap<String, String>(); @@ -162,6 +189,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport { from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet"); + //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost"); + //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300"); } }; }