CAMEL-9452: Camel-Elasticsearch: Support Multisearch operation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b274ec3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b274ec3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b274ec3 Branch: refs/heads/master Commit: 0b274ec3fb29bc6e3ce253105db813b753f6fa5f Parents: a2b6ab7 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sat Dec 26 12:35:52 2015 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sat Dec 26 12:37:35 2015 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchConstants.java | 1 + .../elasticsearch/ElasticsearchProducer.java | 6 ++++ .../ElasticsearchActionRequestConverter.java | 13 ++++++++ .../elasticsearch/ElasticsearchBaseTest.java | 4 +-- ...icsearchGetSearchDeleteExistsUpdateTest.java | 33 +++++++++++++++++++- 5 files changed, 54 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0b274ec3/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java index dab1cc8..e4f2f39 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java @@ -30,6 +30,7 @@ public interface ElasticsearchConstants { String OPERATION_MULTIGET = "MULTIGET"; String OPERATION_DELETE = "DELETE"; String OPERATION_SEARCH = "SEARCH"; + String OPERATION_MULTISEARCH = "MULTISEARCH"; String OPERATION_EXISTS = "EXISTS"; String PARAM_INDEX_ID = "indexId"; String PARAM_DATA = "data"; http://git-wip-us.apache.org/repos/asf/camel/blob/0b274ec3/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 3e52607..136b610 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.exists.ExistsRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; @@ -79,6 +80,8 @@ public class ElasticsearchProducer extends DefaultProducer { return ElasticsearchConstants.OPERATION_EXISTS; } else if (request instanceof SearchRequest) { return ElasticsearchConstants.OPERATION_SEARCH; + } else if (request instanceof MultiGetRequest) { + return ElasticsearchConstants.OPERATION_MULTISEARCH; } String operationConfig = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_OPERATION, String.class); @@ -160,6 +163,9 @@ public class ElasticsearchProducer extends DefaultProducer { } else if (ElasticsearchConstants.OPERATION_SEARCH.equals(operation)) { SearchRequest searchRequest = message.getBody(SearchRequest.class); message.setBody(client.search(searchRequest).actionGet()); + } else if (ElasticsearchConstants.OPERATION_MULTISEARCH.equals(operation)) { + MultiSearchRequest multiSearchRequest = message.getBody(MultiSearchRequest.class); + message.setBody(client.multiSearch(multiSearchRequest)); } else { throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported"); } http://git-wip-us.apache.org/repos/asf/camel/blob/0b274ec3/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java index 98957c9..aa50563 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -135,6 +136,18 @@ public final class ElasticsearchActionRequestConverter { } return multiGetRequest; } + + @Converter + public static MultiSearchRequest toMultiSearchRequest(Object document, Exchange exchange) { + List<SearchRequest> items = (List<SearchRequest>) document; + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + Iterator<SearchRequest> it = items.iterator(); + while (it.hasNext()) { + SearchRequest item = (SearchRequest) it.next(); + multiSearchRequest.add(item); + } + return multiSearchRequest; + } @Converter public static DeleteRequest toDeleteRequest(String id, Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/0b274ec3/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchBaseTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchBaseTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchBaseTest.java index c04c77a..7ac2730 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchBaseTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchBaseTest.java @@ -31,8 +31,8 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder; public class ElasticsearchBaseTest extends CamelTestSupport { - private static Node node; - private static Client client; + public static Node node; + public static Client client; @BeforeClass public static void cleanupOnce() { http://git-wip-us.apache.org/repos/asf/camel/blob/0b274ec3/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteExistsUpdateTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteExistsUpdateTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteExistsUpdateTest.java index a70251d..70ac14f 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteExistsUpdateTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchGetSearchDeleteExistsUpdateTest.java @@ -24,14 +24,17 @@ import java.util.Map; import org.apache.camel.builder.RouteBuilder; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.exists.ExistsResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest.Item; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; import org.junit.Test; import static org.hamcrest.CoreMatchers.equalTo; @@ -185,6 +188,33 @@ public class ElasticsearchGetSearchDeleteExistsUpdateTest extends ElasticsearchB assertFalse("response 2 should be ok", responses[1].isFailed()); assertTrue("response 3 should be failed", responses[2].isFailed()); } + + @Test + public void testMultiSearch() throws Exception { + //first, INDEX two values + Map<String, Object> headers = new HashMap<String, Object>(); + + node.client().prepareIndex("test", "type", "1").setSource("field", "xxx").execute().actionGet(); + node.client().prepareIndex("test", "type", "2").setSource("field", "yyy").execute().actionGet(); + + //now, verify MULTISEARCH + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_MULTISEARCH); + SearchRequestBuilder srb1 = node.client().prepareSearch("test").setTypes("type").setQuery(QueryBuilders.termQuery("field", "xxx")); + SearchRequestBuilder srb2 = node.client().prepareSearch("test").setTypes("type").setQuery(QueryBuilders.termQuery("field", "yyy")); + SearchRequestBuilder srb3= node.client().prepareSearch("instagram") + .setTypes("type").setQuery(QueryBuilders.termQuery("test-multisearchkey", "test-multisearchvalue")); + List<SearchRequest> list = new ArrayList<>(); + list.add(srb1.request()); + list.add(srb2.request()); + list.add(srb3.request()); + MultiSearchResponse response = template.requestBodyAndHeaders("direct:multisearch", list, headers, MultiSearchResponse.class); + MultiSearchResponse.Item[] responses = response.getResponses(); + assertNotNull("response should not be null", response); + assertEquals("response should contains three multiSearchResponse object", 3, response.getResponses().length); + assertFalse("response 1 should be ok", responses[0].isFailure()); + assertFalse("response 2 should be ok", responses[1].isFailure()); + assertTrue("response 3 should be failed", responses[2].isFailure()); + } @Test public void testDeleteWithHeaders() throws Exception { @@ -286,6 +316,7 @@ public class ElasticsearchGetSearchDeleteExistsUpdateTest extends ElasticsearchB from("direct:search").to("elasticsearch://local?operation=SEARCH&indexName=twitter&indexType=tweet"); from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet"); from("direct:exists").to("elasticsearch://local?operation=EXISTS"); + from("direct:multisearch").to("elasticsearch://local?operation=MULTISEARCH&indexName=test"); } }; }