Author: boday Date: Thu Aug 2 04:52:36 2012 New Revision: 1368351 URL: http://svn.apache.org/viewvc?rev=1368351&view=rev Log: CAMEL-5481 added basic GET_BY_ID support to camel-elasticsearch and fixed some issues with the configuration class
Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java (original) +++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java Thu Aug 2 04:52:36 2012 @@ -28,21 +28,16 @@ import org.apache.camel.impl.DefaultComp */ public class ElasticsearchComponent extends DefaultComponent { - private ElasticsearchConfiguration config; - public ElasticsearchComponent() { super(); - config = new ElasticsearchConfiguration(); } public ElasticsearchComponent(CamelContext context) { super(context); - config = new ElasticsearchConfiguration(); } protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - config.parseURI(new URI(uri), parameters, this); - Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config); + Endpoint endpoint = new ElasticsearchEndpoint(uri, this, parameters); setProperties(endpoint, parameters); return endpoint; } Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java (original) +++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java Thu Aug 2 04:52:36 2012 @@ -29,6 +29,10 @@ import static org.elasticsearch.node.Nod public class ElasticsearchConfiguration { + public static final String PARAM_OPERATION = "operation"; + public static final String OPERATION_INDEX = "INDEX"; + public static final String OPERATION_GET_BY_ID = "GET_BY_ID"; + public static final String PARAM_INDEX_ID = "indexId"; public static final String PARAM_DATA = "data"; public static final String PARAM_INDEX_NAME = "indexName"; public static final String PARAM_INDEX_TYPE = "indexType"; @@ -44,16 +48,10 @@ public class ElasticsearchConfiguration private String indexType; private boolean local; private Boolean data; + private String operation; - public ElasticsearchConfiguration() { - } - - public ElasticsearchConfiguration(URI uri) throws Exception { - this(); - this.uri = uri; - } + public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception { - public void parseURI(URI uri, Map<String, Object> parameters, ElasticsearchComponent component) throws Exception { String protocol = uri.getScheme(); if (!protocol.equalsIgnoreCase(PROTOCOL)) { @@ -86,6 +84,7 @@ public class ElasticsearchConfiguration indexName = (String)parameters.remove(PARAM_INDEX_NAME); indexType = (String)parameters.remove(PARAM_INDEX_TYPE); + operation = (String)parameters.remove(PARAM_OPERATION); } protected Boolean toBoolean(Object string) { @@ -178,4 +177,11 @@ public class ElasticsearchConfiguration this.data = data; } + public void setOperation(String operation) { + this.operation = operation; + } + + public String getOperation() { + return this.operation; + } } Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java (original) +++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java Thu Aug 2 04:52:36 2012 @@ -16,6 +16,9 @@ */ package org.apache.camel.component.elasticsearch; +import java.net.URI; +import java.util.Map; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -37,9 +40,9 @@ public class ElasticsearchEndpoint exten private Client client; private ElasticsearchConfiguration config; - public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config) { + public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, Map<String, Object> parameters) throws Exception { super(uri, component); - this.config = config; + this.config = new ElasticsearchConfiguration(new URI(uri), parameters); } public Producer createProducer() throws Exception { @@ -91,4 +94,8 @@ public class ElasticsearchEndpoint exten return config; } + public void setOperation(String operation) { + config.setOperation(operation); + } + } Modified: camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java (original) +++ camel/trunk/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java Thu Aug 2 04:52:36 2012 @@ -23,8 +23,10 @@ import org.apache.camel.ExpectedBodyType import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -41,8 +43,46 @@ public class ElasticsearchProducer exten } public void process(Exchange exchange) throws Exception { + + String operation = (String) exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION); + if (operation == null) { + operation = endpoint.getConfig().getOperation(); + } + + if (operation == null) { + throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " is missing"); + } + Client client = endpoint.getClient(); - log.debug("indexing " + exchange); + + if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_INDEX)) { + addToIndex(client, exchange); + } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_GET_BY_ID)) { + getById(client, exchange); + } else { + throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported"); + } + } + + public void getById(Client client, Exchange exchange) { + + String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); + if (indexName == null) { + indexName = endpoint.getConfig().getIndexName(); + } + + String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); + if (indexType == null) { + indexType = endpoint.getConfig().getIndexType(); + } + + String indexId = exchange.getIn().getBody(String.class); + + GetResponse response = client.prepareGet(indexName, indexType, indexId).execute().actionGet(); + exchange.getIn().setBody(response); + } + + public void addToIndex(Client client, Exchange exchange) { String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); if (indexName == null) { @@ -55,11 +95,13 @@ public class ElasticsearchProducer exten } IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType); + if (!setIndexRequestSource(exchange.getIn(), prepareIndex)) { throw new ExpectedBodyTypeException(exchange, XContentBuilder.class); } ListenableActionFuture<IndexResponse> future = prepareIndex.execute(); IndexResponse response = future.actionGet(); + exchange.getIn().setBody(response.getId()); } private boolean setIndexRequestSource(Message msg, IndexRequestBuilder builder) { Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java (original) +++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java Thu Aug 2 04:52:36 2012 @@ -17,31 +17,82 @@ package org.apache.camel.component.elasticsearch; import java.util.HashMap; +import java.util.Map; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.CamelTestSupport; +import org.elasticsearch.action.get.GetResponse; import org.junit.Test; public class ElasticsearchComponentTest extends CamelTestSupport { @Test public void testIndex() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMinimumMessageCount(1); - sendBody("direct:index", new HashMap<String, String>() { - { - put("content", "test"); - } - }); - assertMockEndpointsSatisfied(); + HashMap<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + String indexId = (String) template.requestBody("direct:index", map); + assertNotNull("indexId should be set", indexId); + } + + @Test + public void testGet() throws Exception { + //first, index a document + HashMap<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + sendBody("direct:index", map); + String indexId = (String) template.requestBody("direct:index", map); + assertNotNull("indexId should be set", indexId); + + //now, verify get succeeded + GetResponse response = (GetResponse) template.requestBody("direct:get", indexId); + assertNotNull("response should not be null", response); + assertNotNull("response source should not be null", response.getSource()); + } + + @Test + public void testIndexWithHeaders() throws Exception { + + HashMap<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers); + assertNotNull("indexId should be set", indexId); + } + + @Test + public void testGetWithHeaders() throws Exception { + + //first, INDEX a document + HashMap<String, String> map = new HashMap<String, String>(); + map.put("content", "test"); + + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + + String indexId = (String) template.requestBodyAndHeaders("direct:start", map, headers); + + //now, verify GET + headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID); + GetResponse response = (GetResponse) template.requestBodyAndHeaders("direct:start", indexId, headers); + assertNotNull("response should not be null", response); + assertNotNull("response source should not be null", response.getSource()); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { - from("direct:index").to("elasticsearch://local?indexName=twitter&indexType=tweet").to("mock:result"); + from("direct:start").to("elasticsearch://local"); + from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); + from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); } }; } Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java (original) +++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java Thu Aug 2 04:52:36 2012 @@ -31,53 +31,52 @@ public class ElasticsearchConfigurationT @Test public void localNode() throws Exception { - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(); - URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet"); + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); Map<String, Object> parameters = URISupport.parseParameters(uri); - conf.parseURI(uri, parameters, null); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertTrue(conf.isLocal()); assertEquals("twitter", conf.getIndexName()); assertEquals("tweet", conf.getIndexType()); + assertEquals("INDEX", conf.getOperation()); assertTrue(conf.isData()); assertNull(conf.getClusterName()); } @Test(expected = IllegalArgumentException.class) public void localNonDataNodeThrowsIllegalArgumentException() throws Exception { - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(); - URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=false"); + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=false"); Map<String, Object> parameters = URISupport.parseParameters(uri); - conf.parseURI(uri, parameters, null); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); } @Test public void localConfDefaultsToDataNode() throws Exception { - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(); - URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet"); + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); Map<String, Object> parameters = URISupport.parseParameters(uri); - conf.parseURI(uri, parameters, null); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); + assertEquals("INDEX", conf.getOperation()); assertTrue(conf.isLocal()); assertTrue(conf.isData()); } @Test public void clusterConfDefaultsToNonDataNode() throws Exception { - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(); - URI uri = new URI("elasticsearch://clustername?indexName=twitter&indexType=tweet"); + URI uri = new URI("elasticsearch://clustername?operation=INDEX&indexName=twitter&indexType=tweet"); Map<String, Object> parameters = URISupport.parseParameters(uri); - conf.parseURI(uri, parameters, null); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertEquals("clustername", conf.getClusterName()); + assertEquals("INDEX", conf.getOperation()); assertFalse(conf.isLocal()); assertFalse(conf.isData()); } @Test public void localDataNode() throws Exception { - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(); - URI uri = new URI("elasticsearch://local?indexName=twitter&indexType=tweet&data=true"); + URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=true"); Map<String, Object> parameters = URISupport.parseParameters(uri); - conf.parseURI(uri, parameters, null); + ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); assertTrue(conf.isLocal()); + assertEquals("INDEX", conf.getOperation()); assertEquals("twitter", conf.getIndexName()); assertEquals("tweet", conf.getIndexType()); assertTrue(conf.isData()); Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java (original) +++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchIndexNameAndTypeInHeaderComponentTest.java Thu Aug 2 04:52:36 2012 @@ -35,6 +35,7 @@ public class ElasticsearchIndexNameAndTy } }, new HashMap() { { + put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); } Modified: camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java (original) +++ camel/trunk/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java Thu Aug 2 04:52:36 2012 @@ -56,6 +56,7 @@ public class SpringElasticsearchTest ext } }, new HashMap() { { + put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); } Modified: camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml?rev=1368351&r1=1368350&r2=1368351&view=diff ============================================================================== --- camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml (original) +++ camel/trunk/components/camel-elasticsearch/src/test/resources/SpringElasticsearchTest.xml Thu Aug 2 04:52:36 2012 @@ -7,7 +7,7 @@ <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:index" /> - <to uri="elasticsearch://local?indexName=twitter&indexType=tweet"/> + <to uri="elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"/> <to uri="mock:result"/> </route> </camelContext>