Repository: camel Updated Branches: refs/heads/master 8b5b1abf4 -> 490478749
Component docs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49047874 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49047874 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49047874 Branch: refs/heads/master Commit: 490478749f478aec1d09cfa9aa7537d0875a4218 Parents: 8b5b1ab Author: Claus Ibsen <davscl...@apache.org> Authored: Tue May 12 09:21:29 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue May 12 09:21:29 2015 +0200 ---------------------------------------------------------------------- .../ElasticsearchConfiguration.java | 118 +++++++++++-------- .../elasticsearch/ElasticsearchEndpoint.java | 25 ++-- .../ElasticsearchConfigurationTest.java | 38 +++--- 3 files changed, 98 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/49047874/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index f33105a..a2933b9 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -35,7 +35,6 @@ import org.elasticsearch.node.NodeBuilder; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; - @UriParams public class ElasticsearchConfiguration { @@ -64,33 +63,29 @@ public class ElasticsearchConfiguration { private static final String IP_PORT_SEPARATOR_REGEX = ":"; private URI uri; - @UriPath(description = "Name of cluster or use local for local mode") - @Metadata(required = "true") + private boolean local; + private List<InetSocketTransportAddress> transportAddressesList; + + @UriPath @Metadata(required = "true") private String clusterName; - @UriParam - private String protocolType; - @UriParam - private String authority; + @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true") + private String operation; @UriParam private String indexName; @UriParam private String indexType; - @UriParam - private WriteConsistencyLevel consistencyLevel; - @UriParam - private ReplicationType replicationType; - @UriParam - private boolean local; + @UriParam(defaultValue = "DEFAULT") + private WriteConsistencyLevel consistencyLevel = DEFAULT_CONSISTENCY_LEVEL; + @UriParam(defaultValue = "DEFAULT") + private ReplicationType replicationType = DEFAULT_REPLICATION_TYPE; @UriParam private Boolean data; - @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") - private String operation; @UriParam private String ip; @UriParam - private List<InetSocketTransportAddress> transportAddresses; - @UriParam - private Integer port; + private String transportAddresses; + @UriParam(defaultValue = "9300") + private int port = DEFAULT_PORT; public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception { String protocol = uri.getScheme(); @@ -99,18 +94,17 @@ public class ElasticsearchConfiguration { throw new IllegalArgumentException("unrecognized elasticsearch protocol: " + protocol + " for uri: " + uri); } setUri(uri); - setAuthority(uri.getAuthority()); - if (!isValidAuthority()) { + if (!isValidAuthority(uri.getAuthority())) { throw new URISyntaxException(uri.toASCIIString(), "incorrect URI syntax specified for the elasticsearch endpoint." + "please specify the syntax as \"elasticsearch:[Cluster Name | 'local']?[Query]\""); } - if (LOCAL_NAME.equals(getAuthority())) { + if (LOCAL_NAME.equals(uri.getAuthority())) { setLocal(true); setClusterName(null); } else { setLocal(false); - setClusterName(getAuthority()); + setClusterName(uri.getAuthority()); } data = toBoolean(parameters.remove(PARAM_DATA)); @@ -130,12 +124,20 @@ public class ElasticsearchConfiguration { replicationType = parseReplicationType(parameters); ip = (String)parameters.remove(IP); - transportAddresses = parseTransportAddresses((String) parameters.remove(TRANSPORT_ADDRESSES)); + transportAddresses = (String) parameters.remove(TRANSPORT_ADDRESSES); + transportAddressesList = parseTransportAddresses(transportAddresses); String portParam = (String) parameters.remove(PORT); port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam); } + private static boolean isValidAuthority(String authority) throws URISyntaxException { + if (authority.contains(":")) { + return false; + } + return true; + } + private ReplicationType parseReplicationType(Map<String, Object> parameters) { Object replicationTypeParam = parameters.remove(PARAM_REPLICATION_TYPE); if (replicationTypeParam != null) { @@ -159,7 +161,7 @@ public class ElasticsearchConfiguration { return null; } List<String> addressesStr = Arrays.asList(ipsString.split(TRANSPORT_ADDRESSES_SEPARATOR_REGEX)); - List<InetSocketTransportAddress> addressesTrAd = new ArrayList(addressesStr.size()); + List<InetSocketTransportAddress> addressesTrAd = new ArrayList<InetSocketTransportAddress>(addressesStr.size()); for (String address : addressesStr) { String[] split = address.split(IP_PORT_SEPARATOR_REGEX); String hostname; @@ -192,14 +194,6 @@ public class ElasticsearchConfiguration { return builder.node(); } - private boolean isValidAuthority() throws URISyntaxException { - if (authority.contains(":")) { - return false; - } - return true; - - } - public URI getUri() { return uri; } @@ -208,34 +202,24 @@ public class ElasticsearchConfiguration { this.uri = uri; } - public String getProtocolType() { - return protocolType; - } - - public void setProtocolType(String protocolType) { - this.protocolType = protocolType; - } - public String getClusterName() { return clusterName; } + /** + * Name of cluster or use local for local mode + */ public void setClusterName(String clusterName) { this.clusterName = clusterName; } - public String getAuthority() { - return authority; - } - - public void setAuthority(String authority) { - this.authority = authority; - } - public String getIndexName() { return indexName; } + /** + * The name of the index to act against + */ public void setIndexName(String indexName) { this.indexName = indexName; } @@ -244,6 +228,9 @@ public class ElasticsearchConfiguration { return indexType; } + /** + * The type of the index to act against + */ public void setIndexType(String indexType) { this.indexType = indexType; } @@ -260,10 +247,16 @@ public class ElasticsearchConfiguration { return data; } + /** + * Is the node going to be allowed to allocate data (shards) to it or not. This setting map to the <tt>node.data</tt> setting. + */ public void setData(boolean data) { this.data = data; } + /** + * What operation to perform + */ public void setOperation(String operation) { this.operation = operation; } @@ -276,26 +269,44 @@ public class ElasticsearchConfiguration { return ip; } + /** + * The TransportClient remote host ip to use + */ public void setIp(String ip) { this.ip = ip; } - public List<InetSocketTransportAddress> getTransportAddresses() { + public List<InetSocketTransportAddress> getTransportAddressesList() { + return transportAddressesList; + } + + public String getTransportAddresses() { return transportAddresses; } - public void setTransportAddresses(List<InetSocketTransportAddress> transportAddresses) { + /** + * Comma separated list with ip:port formatted remote transport addresses to use. + * The ip and port options must be left blank for transportAddresses to be considered instead. + */ + public void setTransportAddresses(String transportAddresses) { this.transportAddresses = transportAddresses; + this.transportAddressesList = parseTransportAddresses(transportAddresses); } - public Integer getPort() { + public int getPort() { return port; } - public void setPort(Integer port) { + /** + * The TransportClient remote port to use (defaults to 9300) + */ + public void setPort(int port) { this.port = port; } + /** + * The write consistency level to use with INDEX and BULK operations (can be any of ONE, QUORUM, ALL or DEFAULT) + */ public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { this.consistencyLevel = consistencyLevel; } @@ -304,6 +315,9 @@ public class ElasticsearchConfiguration { return consistencyLevel; } + /** + * The replication type to use with INDEX and BULK operations (can be any of SYNC, ASYNC or DEFAULT) + */ public void setReplicationType(ReplicationType replicationType) { this.replicationType = replicationType; } http://git-wip-us.apache.org/repos/asf/camel/blob/49047874/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java index ec4e0c2..61e5a4f 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java @@ -20,10 +20,10 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -60,7 +60,7 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { } public Consumer createConsumer(Processor processor) throws Exception { - throw new RuntimeCamelException("Cannot consume to a ElasticsearchEndpoint: " + getEndpointUri()); + throw new UnsupportedOperationException("Cannot consume from an ElasticsearchEndpoint: " + getEndpointUri()); } public boolean isSingleton() { @@ -68,6 +68,7 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { } @Override + @SuppressWarnings("unchecked") protected void doStart() throws Exception { super.doStart(); if (configuration.isLocal()) { @@ -79,14 +80,14 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { this.client = new TransportClient(getSettings()) .addTransportAddress(new InetSocketTransportAddress(configuration.getIp(), configuration.getPort())); - } else if (configuration.getTransportAddresses() != null - && !configuration.getTransportAddresses().isEmpty()) { - List<TransportAddress> addresses = new ArrayList(configuration.getTransportAddresses().size()); - for (TransportAddress address : configuration.getTransportAddresses()) { + } else if (configuration.getTransportAddressesList() != null + && !configuration.getTransportAddressesList().isEmpty()) { + List<TransportAddress> addresses = new ArrayList(configuration.getTransportAddressesList().size()); + for (TransportAddress address : configuration.getTransportAddressesList()) { addresses.add(address); } this.client = new TransportClient(getSettings()) - .addTransportAddresses(addresses.toArray(new TransportAddress[0])); + .addTransportAddresses(addresses.toArray(new TransportAddress[addresses.size()])); } else { node = configuration.buildNode(); client = node.client(); @@ -95,16 +96,16 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { private Settings getSettings() { return ImmutableSettings.settingsBuilder() - // setting the classloader here will allow the underlying elasticsearch-java - // class to find its names.txt in an OSGi environment (otherwise the thread - // classloader is used, which won't be able to see the file causing a startup + // setting the classloader here will allow the underlying elasticsearch-java + // class to find its names.txt in an OSGi environment (otherwise the thread + // classloader is used, which won't be able to see the file causing a startup // exception). .classLoader(Settings.class.getClassLoader()) .put("cluster.name", configuration.getClusterName()) .put("client.transport.ignore_cluster_name", false) - .put("node.client", true) + .put("node.client", true) .put("client.transport.sniff", true) - .build(); + .build(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/49047874/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java index f025edf..6690a5c 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java @@ -79,7 +79,7 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { assertFalse(conf.isLocal()); assertFalse(conf.isData()); assertEquals("127.0.0.1", conf.getIp()); - assertEquals(9300, conf.getPort().intValue()); + assertEquals(9300, conf.getPort()); } @Test @@ -146,9 +146,9 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { Map<String, Object> parameters = URISupport.parseParameters(uri); ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertDefaultConfigurationParameters(conf); - assertEquals(1, conf.getTransportAddresses().size()); - assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); - assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort()); + assertEquals(1, conf.getTransportAddressesList().size()); + assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); + assertEquals(9300, conf.getTransportAddressesList().get(0).address().getPort()); } @Test @@ -158,11 +158,11 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { Map<String, Object> parameters = URISupport.parseParameters(uri); ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertDefaultConfigurationParameters(conf); - assertEquals(2, conf.getTransportAddresses().size()); - assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); - assertEquals(9300, conf.getTransportAddresses().get(0).address().getPort()); - assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString()); - assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort()); + assertEquals(2, conf.getTransportAddressesList().size()); + assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); + assertEquals(9300, conf.getTransportAddressesList().get(0).address().getPort()); + assertEquals("127.0.0.2", conf.getTransportAddressesList().get(1).address().getHostString()); + assertEquals(9300, conf.getTransportAddressesList().get(1).address().getPort()); } @Test @@ -172,9 +172,9 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { Map<String, Object> parameters = URISupport.parseParameters(uri); ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertDefaultConfigurationParameters(conf); - assertEquals(1, conf.getTransportAddresses().size()); - assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); - assertEquals(9305, conf.getTransportAddresses().get(0).address().getPort()); + assertEquals(1, conf.getTransportAddressesList().size()); + assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); + assertEquals(9305, conf.getTransportAddressesList().get(0).address().getPort()); } @Test @@ -184,13 +184,13 @@ public class ElasticsearchConfigurationTest extends CamelTestSupport { Map<String, Object> parameters = URISupport.parseParameters(uri); ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertDefaultConfigurationParameters(conf); - assertEquals(3, conf.getTransportAddresses().size()); - assertEquals("127.0.0.1", conf.getTransportAddresses().get(0).address().getHostString()); - assertEquals(9400, conf.getTransportAddresses().get(0).address().getPort()); - assertEquals("127.0.0.2", conf.getTransportAddresses().get(1).address().getHostString()); - assertEquals(9300, conf.getTransportAddresses().get(1).address().getPort()); - assertEquals("127.0.0.3", conf.getTransportAddresses().get(2).address().getHostString()); - assertEquals(9401, conf.getTransportAddresses().get(2).address().getPort()); + assertEquals(3, conf.getTransportAddressesList().size()); + assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); + assertEquals(9400, conf.getTransportAddressesList().get(0).address().getPort()); + assertEquals("127.0.0.2", conf.getTransportAddressesList().get(1).address().getHostString()); + assertEquals(9300, conf.getTransportAddressesList().get(1).address().getPort()); + assertEquals("127.0.0.3", conf.getTransportAddressesList().get(2).address().getHostString()); + assertEquals(9401, conf.getTransportAddressesList().get(2).address().getPort()); } private void assertDefaultConfigurationParameters(ElasticsearchConfiguration conf) {