Repository: camel Updated Branches: refs/heads/master 0afc35ea6 -> a9ba5b98a
CAMEL-9400: Camel-Elasticsearch: Add Multiget Operation support Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c81e302 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c81e302 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c81e302 Branch: refs/heads/master Commit: 1c81e302c81a5f3abb1d20c21d231194a1744e72 Parents: 0afc35e Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue Dec 8 14:30:35 2015 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue Dec 8 14:30:35 2015 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchConstants.java | 1 + .../elasticsearch/ElasticsearchProducer.java | 6 +++ .../ElasticsearchActionRequestConverter.java | 15 +++++++ .../ElasticsearchGetSearchDeleteUpdateTest.java | 47 ++++++++++++++++++++ 4 files changed, 69 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1c81e302/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java index 7ccbf00..caae8c8 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java @@ -27,6 +27,7 @@ public interface ElasticsearchConstants { String OPERATION_BULK = "BULK"; String OPERATION_BULK_INDEX = "BULK_INDEX"; String OPERATION_GET_BY_ID = "GET_BY_ID"; + String OPERATION_MULTIGET = "MULTIGET"; String OPERATION_DELETE = "DELETE"; String OPERATION_SEARCH = "SEARCH"; String PARAM_INDEX_ID = "indexId"; http://git-wip-us.apache.org/repos/asf/camel/blob/1c81e302/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 4eac708..9a54dde 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -60,6 +61,8 @@ public class ElasticsearchProducer extends DefaultProducer { return ElasticsearchConstants.OPERATION_INDEX; } else if (request instanceof GetRequest) { return ElasticsearchConstants.OPERATION_GET_BY_ID; + } else if (request instanceof MultiGetRequest) { + return ElasticsearchConstants.OPERATION_MULTIGET; } else if (request instanceof UpdateRequest) { return ElasticsearchConstants.OPERATION_UPDATE; } else if (request instanceof BulkRequest) { @@ -132,6 +135,9 @@ public class ElasticsearchProducer extends DefaultProducer { } else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) { GetRequest getRequest = message.getBody(GetRequest.class); message.setBody(client.get(getRequest)); + } else if (ElasticsearchConstants.OPERATION_MULTIGET.equals(operation)) { + MultiGetRequest multiGetRequest = message.getBody(MultiGetRequest.class); + message.setBody(client.multiGet(multiGetRequest)); } else if (ElasticsearchConstants.OPERATION_BULK.equals(operation)) { BulkRequest bulkRequest = message.getBody(BulkRequest.class); message.setBody(client.bulk(bulkRequest).actionGet()); http://git-wip-us.apache.org/repos/asf/camel/blob/1c81e302/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java index fcb1866..92a4ddf 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.elasticsearch.converter; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -26,6 +27,8 @@ import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -113,6 +116,18 @@ public final class ElasticsearchActionRequestConverter { ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).id(id); } + + @Converter + public static MultiGetRequest toMultiGetRequest(Object document, Exchange exchange) { + List<Item> items = (List<Item>) document; + MultiGetRequest multiGetRequest = new MultiGetRequest(); + Iterator<Item> it = items.iterator(); + while (it.hasNext()) { + MultiGetRequest.Item item = (MultiGetRequest.Item) it.next(); + multiGetRequest.add(item); + } + return multiGetRequest; + } @Converter public static DeleteRequest toDeleteRequest(String id, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/1c81e302/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteUpdateTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteUpdateTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteUpdateTest.java index d02ce86..75ddc56 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteUpdateTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteUpdateTest.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.elasticsearch; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.camel.builder.RouteBuilder; @@ -24,6 +26,9 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest.Item; +import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.junit.Test; @@ -119,6 +124,47 @@ public class ElasticsearchGetSearchDeleteUpdateTest extends ElasticsearchBaseTes assertNotNull("response should not be null", response); assertNotNull("response source should not be null", response.getSource()); } + + @Test + public void testMultiGet() throws Exception { + //first, INDEX two values + Map<String, String> map = createIndexedData(); + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "1"); + + template.requestBodyAndHeaders("direct:start", map, headers, String.class); + + headers.clear(); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "facebook"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "status"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "2"); + + template.requestBodyAndHeaders("direct:start", map, headers, String.class); + headers.clear(); + + //now, verify MULTIGET + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_MULTIGET); + Item item1 = new Item("twitter", "tweet", "1"); + Item item2 = new Item("facebook", "status", "2"); + Item item3 = new Item("instagram", "latest", "3"); + List<Item> list = new ArrayList<Item>(); + list.add(item1); + list.add(item2); + list.add(item3); + MultiGetResponse response = template.requestBodyAndHeaders("direct:start", list, headers, MultiGetResponse.class); + MultiGetItemResponse[] responses = response.getResponses(); + assertNotNull("response should not be null", response); + assertEquals("response should contains three multiGetResponse object", 3, response.getResponses().length); + assertEquals("response 1 should contains tweet as type", "tweet", responses[0].getResponse().getType().toString()); + assertEquals("response 2 should contains status as type", "status", responses[1].getResponse().getType().toString()); + assertFalse("response 1 should be ok", responses[0].isFailed()); + assertFalse("response 2 should be ok", responses[1].isFailed()); + assertTrue("response 3 should be failed", responses[2].isFailed()); + } @Test public void testDeleteWithHeaders() throws Exception { @@ -215,6 +261,7 @@ public class ElasticsearchGetSearchDeleteUpdateTest extends ElasticsearchBaseTes from("direct:start").to("elasticsearch://local"); from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); + from("direct:multiget").to("elasticsearch://local?operation=MULTIGET&indexName=twitter&indexType=tweet"); from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet"); from("direct:search").to("elasticsearch://local?operation=SEARCH&indexName=twitter&indexType=tweet"); from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet");