Repository: camel Updated Branches: refs/heads/master 97634ae69 -> 16d352853
CAMEL-8149: Enhance elasticsearch producer to support elasticsearch-java ActionRequest object type bodies Add unit tests Remove author tag; don't use type converter getBody method in aggregator Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/69de2e3d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/69de2e3d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/69de2e3d Branch: refs/heads/master Commit: 69de2e3dd8f9d0e61b3a9e6e5495e406c9159909 Parents: 97634ae Author: Derek Abdine <dabd...@rapid7.com> Authored: Thu Dec 11 17:12:03 2014 -0800 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Dec 23 10:58:03 2014 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchProducer.java | 303 ++++++++----------- .../BulkRequestAggregationStrategy.java | 57 ++++ .../ElasticsearchActionRequestConverter.java | 107 +++++++ .../services/org/apache/camel/TypeConverter | 17 ++ .../ElasticsearchComponentTest.java | 74 +++++ 5 files changed, 381 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index e05eccc..5a25507 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -18,193 +18,142 @@ package org.apache.camel.component.elasticsearch; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.camel.Exchange; -import org.apache.camel.ExpectedBodyTypeException; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.xcontent.XContentBuilder; - /** * Represents an Elasticsearch producer. */ public class ElasticsearchProducer extends DefaultProducer { - public ElasticsearchProducer(ElasticsearchEndpoint endpoint) { - super(endpoint); - } - - @Override - public ElasticsearchEndpoint getEndpoint() { - return (ElasticsearchEndpoint) super.getEndpoint(); - } - - public void process(Exchange exchange) throws Exception { - String operation = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION, String.class); - if (operation == null) { - operation = getEndpoint().getConfig().getOperation(); - } - - if (operation == null) { - throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " is missing"); - } - - Client client = getEndpoint().getClient(); - - if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_INDEX)) { - addToIndex(client, exchange); - } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_GET_BY_ID)) { - getById(client, exchange); - } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_DELETE)) { - deleteById(client, exchange); - } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_BULK_INDEX)) { - addToIndexUsingBulk(client, exchange); - } else { - throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported"); - } - } - - public void getById(Client client, Exchange exchange) { - String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); - if (indexName == null) { - indexName = getEndpoint().getConfig().getIndexName(); - } - - String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); - if (indexType == null) { - indexType = getEndpoint().getConfig().getIndexType(); - } - - String indexId = exchange.getIn().getBody(String.class); - - GetResponse response = client.prepareGet(indexName, indexType, indexId).execute().actionGet(); - exchange.getIn().setBody(response); - } - - public void deleteById(Client client, Exchange exchange) { - String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); - if (indexName == null) { - indexName = getEndpoint().getConfig().getIndexName(); - } - - String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); - if (indexType == null) { - indexType = getEndpoint().getConfig().getIndexType(); - } - - String indexId = exchange.getIn().getBody(String.class); - - DeleteResponse response = client.prepareDelete(indexName, indexType, indexId).execute().actionGet(); - exchange.getIn().setBody(response); - } - - public void addToIndex(Client client, Exchange exchange) { - String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); - if (indexName == null) { - indexName = getEndpoint().getConfig().getIndexName(); - } - - String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); - if (indexType == null) { - indexType = getEndpoint().getConfig().getIndexType(); - } - - String indexId = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, String.class); - - IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType, indexId); - - Object document = extractDocumentFromMessage(exchange.getIn()); - - if (!setIndexRequestSource(document, prepareIndex)) { - throw new ExpectedBodyTypeException(exchange, XContentBuilder.class); - } - ListenableActionFuture<IndexResponse> future = prepareIndex.execute(); - IndexResponse response = future.actionGet(); - exchange.getIn().setBody(response.getId()); - } - - public void addToIndexUsingBulk(Client client, Exchange exchange) { - String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); - if (indexName == null) { - indexName = getEndpoint().getConfig().getIndexName(); - } - - String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); - if (indexType == null) { - indexType = getEndpoint().getConfig().getIndexType(); - } - - log.debug("Preparing Bulk Request"); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - - List<?> body = exchange.getIn().getBody(List.class); - - for (Object document : body) { - IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType); - log.trace("Indexing document : {}", document); - if (!setIndexRequestSource(document, prepareIndex)) { - throw new ExpectedBodyTypeException(exchange, XContentBuilder.class); - } - bulkRequest.add(prepareIndex); - } - - ListenableActionFuture<BulkResponse> future = bulkRequest.execute(); - BulkResponse bulkResponse = future.actionGet(); - - List<String> indexedIds = new LinkedList<String>(); - for (BulkItemResponse response : bulkResponse.getItems()) { - indexedIds.add(response.getId()); - } - log.debug("List of successfully indexed document ids : {}", indexedIds); - exchange.getIn().setBody(indexedIds); - } - - - private Object extractDocumentFromMessage(Message msg) { - Object body = null; - - // order is important - Class<?>[] types = new Class[] {XContentBuilder.class, Map.class, byte[].class, String.class}; - - for (int i = 0; i < types.length && body == null; i++) { - Class<?> type = types[i]; - body = msg.getBody(type); - } - - return body; - - } - - - @SuppressWarnings("unchecked") - private boolean setIndexRequestSource(Object document, IndexRequestBuilder builder) { - boolean converted = false; - - if (document != null) { - converted = true; - if (document instanceof byte[]) { - builder.setSource((byte[])document); - } else if (document instanceof Map) { - builder.setSource((Map<String, Object>) document); - } else if (document instanceof String) { - builder.setSource((String)document); - } else if (document instanceof XContentBuilder) { - builder.setSource((XContentBuilder)document); - } else { - converted = false; - } - } - return converted; - } + public ElasticsearchProducer(ElasticsearchEndpoint endpoint) { + super(endpoint); + } + + @Override + public ElasticsearchEndpoint getEndpoint() { + return (ElasticsearchEndpoint) super.getEndpoint(); + } + + private String resolveOperation(Exchange exchange) { + // 1. Operation can be driven by either (in order of preference): + // a. If the body is an ActionRequest the operation is set by the type + // of request. + // b. If the body is not an ActionRequest, the operation is set by the + // header if it exists. + // c. If neither the operation can not be derived from the body or + // header, the configuration is used. + // In the event we can't discover the operation from a, b or c we throw + // an error. + + Object request = exchange.getIn().getBody(); + if (request instanceof IndexRequest) + return ElasticsearchConfiguration.OPERATION_INDEX; + else if (request instanceof GetRequest) + return ElasticsearchConfiguration.OPERATION_GET_BY_ID; + else if (request instanceof BulkRequest) + return ElasticsearchConfiguration.OPERATION_BULK_INDEX; + else if (request instanceof DeleteRequest) + return ElasticsearchConfiguration.OPERATION_DELETE; + + String operationConfig = exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_OPERATION, String.class); + if (operationConfig == null) { + operationConfig = getEndpoint().getConfig().getOperation(); + } + if (operationConfig == null) + throw new IllegalArgumentException( + ElasticsearchConfiguration.PARAM_OPERATION + " value '" + + operationConfig + "' is not supported"); + return operationConfig; + } + + public void process(Exchange exchange) throws Exception { + // 2. Index and type will be set by: + // a. If the incoming body is already an action request + // b. If the body is not an action request we will use headers if they + // are set. + // c. If the body is not an action request and the headers aren't set we + // will use the configuration. + // No error is thrown by the component in the event none of the above + // conditions are met. The java es client + // will throw. + + Message message = exchange.getIn(); + ElasticsearchConfiguration config = getEndpoint().getConfig(); + final String operation = resolveOperation(exchange); + + // Set the index/type headers on the exchange if necessary. This is used + // for type conversion. + boolean configIndexName = false; + String indexName = message.getHeader( + ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); + if (indexName == null) { + message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, + getEndpoint().getConfig().getIndexName()); + configIndexName = true; + } + + boolean configIndexType = false; + String indexType = message.getHeader( + ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); + if (indexType == null) { + message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, + getEndpoint().getConfig().getIndexName()); + configIndexType = true; + } + + Client client = getEndpoint().getClient(); + if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) { + IndexRequest indexRequest = message.getBody(IndexRequest.class); + message.setBody(client.index(indexRequest).actionGet().getId()); + } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID + .equals(operation)) { + GetRequest getRequest = message.getBody(GetRequest.class); + message.setBody(client.get(getRequest)); + } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX + .equals(operation)) { + BulkRequest bulkRequest = message.getBody(BulkRequest.class); + List<String> indexedIds = new LinkedList<String>(); + for (BulkItemResponse response : client.bulk(bulkRequest) + .actionGet().getItems()) { + indexedIds.add(response.getId()); + } + log.debug("List of successfully indexed document ids : {}", + indexedIds); + message.setBody(indexedIds); + } else if (ElasticsearchConfiguration.OPERATION_DELETE + .equals(operation)) { + DeleteRequest deleteRequest = message.getBody(DeleteRequest.class); + message.setBody(client.delete(deleteRequest).actionGet()); + } else { + throw new IllegalArgumentException( + ElasticsearchConfiguration.PARAM_OPERATION + " value '" + + operation + "' is not supported"); + } + + // If we set params via the configuration on this exchange, remove them + // now. This preserves legacy behavior for this component and enables a + // use case where one message can be sent to multiple elasticsearch + // endpoints where the user is relying on the endpoint configuration + // (index/type) rather than header values. If we do not clear this out + // sending the same message (index request, for example) to multiple + // elasticsearch endpoints would have the effect overriding any + // subsequent endpoint index/type with the first endpoint index/type. + if (configIndexName) { + message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME); + } + + if (configIndexType) { + message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java new file mode 100644 index 0000000..f32fb0a --- /dev/null +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch.aggregation; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.bulk.BulkRequest; + +/** + * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}. + */ +public class BulkRequestAggregationStrategy implements AggregationStrategy +{ + ///////////////////////////////////////////////////////////////////////// + // Public methods + ///////////////////////////////////////////////////////////////////////// + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) + { + // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter. + Object objBody = newExchange.getIn().getBody(); + if (!(objBody instanceof ActionRequest)) + throw new RuntimeCamelException("Invalid body type for elastisearch bulk request aggregation strategy: " + + objBody.getClass().getName()); + + ActionRequest newBody = (ActionRequest)objBody; + BulkRequest request = null; + if (oldExchange == null) + { + request = new BulkRequest(); + request.add(newBody); + newExchange.getIn().setBody(request); + return newExchange; + } + else + { + request = oldExchange.getIn().getBody(BulkRequest.class); + request.add(newBody); + return oldExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java new file mode 100644 index 0000000..2ad0d92 --- /dev/null +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.elasticsearch.converter; + +import java.util.List; +import java.util.Map; +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; + +@Converter +public class ElasticsearchActionRequestConverter { + + // Index requests + private static IndexRequest createIndexRequest(Object document, + Exchange exchange) { + IndexRequest indexRequest = new IndexRequest(); + if (document instanceof byte[]) { + indexRequest.source((byte[]) document); + } else if (document instanceof Map) { + indexRequest.source((Map<String, Object>) document); + } else if (document instanceof String) { + indexRequest.source((String) document); + } else if (document instanceof XContentBuilder) { + indexRequest.source((XContentBuilder) document); + } else { + return null; + } + + return indexRequest.index( + exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_NAME, + String.class)).type( + exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_TYPE, + String.class)); + } + + @Converter + public static IndexRequest toIndexRequest(Object document, Exchange exchange) { + if (document == null) + return null; + + return createIndexRequest(document, exchange).id( + exchange.getIn() + .getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, + String.class)); + } + + @Converter + public static GetRequest toGetRequest(String id, Exchange exchange) { + if (id == null) + return null; + + return new GetRequest(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_TYPE, + String.class)).id(id); + } + + @Converter + public static DeleteRequest toDeleteRequest(String id, Exchange exchange) { + if (id == null) + return null; + + return new DeleteRequest() + .index(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_NAME, + String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConfiguration.PARAM_INDEX_TYPE, + String.class)).id(id); + } + + @Converter + public static BulkRequest toBulkRequest(List<Object> documents, + Exchange exchange) { + if (documents == null) + return null; + + BulkRequest request = new BulkRequest(); + for (Object document : documents) { + request.add(createIndexRequest(document, exchange)); + } + return request; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter new file mode 100644 index 0000000..53a8732 --- /dev/null +++ b/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.camel.component.elasticsearch.converter.ElasticsearchActionRequestConverter \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 7bc2a2c..8398c2a 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -23,12 +23,20 @@ import java.util.Map; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.notNullValue; + public class ElasticsearchComponentTest extends CamelTestSupport { @@ -217,9 +225,75 @@ public class ElasticsearchComponentTest extends CamelTestSupport { assertNull("response source should be null", response.getSource()); } + @Test + public void indexRequestBody() throws Exception { + // given + IndexRequest request = new IndexRequest("foo", "bar", "testId"); + request.source("{\"content\": \"hello\"}"); + + // when + String documentId = template.requestBody("direct:index", request, + String.class); + + // then + assertThat(documentId, equalTo("testId")); + } + + @Test + public void getRequestBody() throws Exception { + // given + GetRequest request = new GetRequest("foo").type("bar"); + + // when + String documentId = template.requestBody("direct:index", + new IndexRequest("foo", "bar", "testId") + .source("{\"content\": \"hello\"}"), String.class); + GetResponse response = template.requestBody("direct:get", + request.id(documentId), GetResponse.class); + + // then + assertThat(response, notNullValue()); + assertThat("hello", equalTo(response.getSourceAsMap().get("content"))); + } + + @Test + public void deleteRequestBody() throws Exception { + // given + DeleteRequest request = new DeleteRequest("foo").type("bar"); + + // when + String documentId = template.requestBody("direct:index", + new IndexRequest("foo", "bar", "testId") + .source("{\"content\": \"hello\"}"), String.class); + DeleteResponse response = template.requestBody("direct:delete", + request.id(documentId), DeleteResponse.class); + + // then + assertThat(response, notNullValue()); + assertThat(documentId, equalTo(response.getId())); + } + + @Test + public void bulkRequestBody() throws Exception { + // given + BulkRequest request = new BulkRequest(); + request.add(new IndexRequest("foo", "bar", "baz") + .source("{\"content\": \"hello\"}")); + + // when + List<String> indexedDocumentIds = template.requestBody( + "direct:bulk_index", request, List.class); + + // then + assertThat(indexedDocumentIds, notNullValue()); + assertThat(indexedDocumentIds.size(), equalTo(1)); + assertThat(indexedDocumentIds, hasItem("baz")); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { + @Override public void configure() { from("direct:start").to("elasticsearch://local"); from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");