Repository: camel
Updated Branches:
  refs/heads/master 97634ae69 -> 16d352853


CAMEL-8149: Enhance elasticsearch producer to support elasticsearch-java 
ActionRequest object type bodies

Add unit tests

Remove author tag; don't use type converter getBody method in aggregator


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/69de2e3d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/69de2e3d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/69de2e3d

Branch: refs/heads/master
Commit: 69de2e3dd8f9d0e61b3a9e6e5495e406c9159909
Parents: 97634ae
Author: Derek Abdine <dabd...@rapid7.com>
Authored: Thu Dec 11 17:12:03 2014 -0800
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Dec 23 10:58:03 2014 +0100

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchProducer.java    | 303 ++++++++-----------
 .../BulkRequestAggregationStrategy.java         |  57 ++++
 .../ElasticsearchActionRequestConverter.java    | 107 +++++++
 .../services/org/apache/camel/TypeConverter     |  17 ++
 .../ElasticsearchComponentTest.java             |  74 +++++
 5 files changed, 381 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index e05eccc..5a25507 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -18,193 +18,142 @@ package org.apache.camel.component.elasticsearch;
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExpectedBodyTypeException;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
-import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-
 
 /**
  * Represents an Elasticsearch producer.
  */
 public class ElasticsearchProducer extends DefaultProducer {
 
-    public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
-        super(endpoint);
-    }
-
-    @Override
-    public ElasticsearchEndpoint getEndpoint() {
-        return (ElasticsearchEndpoint) super.getEndpoint();
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        String operation = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION, 
String.class);
-        if (operation == null) {
-            operation = getEndpoint().getConfig().getOperation();
-        }
-
-        if (operation == null) {
-            throw new 
IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " is 
missing");
-        }
-
-        Client client = getEndpoint().getClient();
-
-        if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_INDEX)) {
-            addToIndex(client, exchange);
-        } else if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_GET_BY_ID)) {
-            getById(client, exchange);
-        } else if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_DELETE)) {
-            deleteById(client, exchange);
-        } else if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_BULK_INDEX)) {
-            addToIndexUsingBulk(client, exchange);
-        } else {
-            throw new 
IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value 
'" + operation + "' is not supported");
-        }
-    }
-
-    public void getById(Client client, Exchange exchange) {
-        String indexName = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
-        if (indexName == null) {
-            indexName = getEndpoint().getConfig().getIndexName();
-        }
-
-        String indexType = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
-        if (indexType == null) {
-            indexType = getEndpoint().getConfig().getIndexType();
-        }
-
-        String indexId = exchange.getIn().getBody(String.class);
-
-        GetResponse response = client.prepareGet(indexName, indexType, 
indexId).execute().actionGet();
-        exchange.getIn().setBody(response);
-    }
-
-    public void deleteById(Client client, Exchange exchange) {
-        String indexName = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
-        if (indexName == null) {
-            indexName = getEndpoint().getConfig().getIndexName();
-        }
-
-        String indexType = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
-        if (indexType == null) {
-            indexType = getEndpoint().getConfig().getIndexType();
-        }
-
-        String indexId = exchange.getIn().getBody(String.class);
-
-        DeleteResponse response = client.prepareDelete(indexName, indexType, 
indexId).execute().actionGet();
-        exchange.getIn().setBody(response);
-    }
-
-    public void addToIndex(Client client, Exchange exchange) {
-        String indexName = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
-        if (indexName == null) {
-            indexName = getEndpoint().getConfig().getIndexName();
-        }
-
-        String indexType = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
-        if (indexType == null) {
-            indexType = getEndpoint().getConfig().getIndexType();
-        }
-
-        String indexId = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, 
String.class);
-
-        IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, 
indexType, indexId);
-
-        Object document = extractDocumentFromMessage(exchange.getIn());
-
-        if (!setIndexRequestSource(document, prepareIndex)) {
-            throw new ExpectedBodyTypeException(exchange, 
XContentBuilder.class);
-        }
-        ListenableActionFuture<IndexResponse> future = prepareIndex.execute();
-        IndexResponse response = future.actionGet();
-        exchange.getIn().setBody(response.getId());
-    }
-
-    public void addToIndexUsingBulk(Client client, Exchange exchange) {
-        String indexName = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
-        if (indexName == null) {
-            indexName = getEndpoint().getConfig().getIndexName();
-        }
-
-        String indexType = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
-        if (indexType == null) {
-            indexType = getEndpoint().getConfig().getIndexType();
-        }
-
-        log.debug("Preparing Bulk Request");
-        BulkRequestBuilder bulkRequest = client.prepareBulk();
-
-        List<?> body = exchange.getIn().getBody(List.class);
-
-        for (Object document : body) {
-            IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, 
indexType);
-            log.trace("Indexing document : {}", document);
-            if (!setIndexRequestSource(document, prepareIndex)) {
-                throw new ExpectedBodyTypeException(exchange, 
XContentBuilder.class);
-            }
-            bulkRequest.add(prepareIndex);
-        }
-
-        ListenableActionFuture<BulkResponse> future = bulkRequest.execute();
-        BulkResponse bulkResponse = future.actionGet();
-
-        List<String> indexedIds = new LinkedList<String>();
-        for (BulkItemResponse response : bulkResponse.getItems()) {
-            indexedIds.add(response.getId());
-        }
-        log.debug("List of successfully indexed document ids : {}", 
indexedIds);
-        exchange.getIn().setBody(indexedIds);
-    }
-
-
-    private Object extractDocumentFromMessage(Message msg) {
-        Object body = null;
-
-        // order is important
-        Class<?>[] types = new Class[] {XContentBuilder.class, Map.class, 
byte[].class, String.class};
-
-        for (int i = 0; i < types.length && body == null; i++) {
-            Class<?> type = types[i];
-            body = msg.getBody(type);
-        }
-
-        return body;
-
-    }
-
-
-    @SuppressWarnings("unchecked")
-    private boolean setIndexRequestSource(Object document, IndexRequestBuilder 
builder) {
-        boolean converted = false;
-
-        if (document != null) {
-            converted = true;
-            if (document instanceof byte[]) {
-                builder.setSource((byte[])document);
-            } else if (document instanceof Map) {
-                builder.setSource((Map<String, Object>) document);
-            } else if (document instanceof String) {
-                builder.setSource((String)document);
-            } else if (document instanceof XContentBuilder) {
-                builder.setSource((XContentBuilder)document);
-            } else {
-                converted = false;
-            }
-        }
-        return converted;
-    }
+       public ElasticsearchProducer(ElasticsearchEndpoint endpoint) {
+               super(endpoint);
+       }
+
+       @Override
+       public ElasticsearchEndpoint getEndpoint() {
+               return (ElasticsearchEndpoint) super.getEndpoint();
+       }
+
+       private String resolveOperation(Exchange exchange) {
+               // 1. Operation can be driven by either (in order of 
preference):
+               // a. If the body is an ActionRequest the operation is set by 
the type
+               // of request.
+               // b. If the body is not an ActionRequest, the operation is set 
by the
+               // header if it exists.
+               // c. If neither the operation can not be derived from the body 
or
+               // header, the configuration is used.
+               // In the event we can't discover the operation from a, b or c 
we throw
+               // an error.
+
+               Object request = exchange.getIn().getBody();
+               if (request instanceof IndexRequest)
+                       return ElasticsearchConfiguration.OPERATION_INDEX;
+               else if (request instanceof GetRequest)
+                       return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
+               else if (request instanceof BulkRequest)
+                       return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
+               else if (request instanceof DeleteRequest)
+                       return ElasticsearchConfiguration.OPERATION_DELETE;
+
+               String operationConfig = exchange.getIn().getHeader(
+                               ElasticsearchConfiguration.PARAM_OPERATION, 
String.class);
+               if (operationConfig == null) {
+                       operationConfig = 
getEndpoint().getConfig().getOperation();
+               }
+               if (operationConfig == null)
+                       throw new IllegalArgumentException(
+                                       
ElasticsearchConfiguration.PARAM_OPERATION + " value '"
+                                                       + operationConfig + "' 
is not supported");
+               return operationConfig;
+       }
+
+       public void process(Exchange exchange) throws Exception {
+               // 2. Index and type will be set by:
+               // a. If the incoming body is already an action request
+               // b. If the body is not an action request we will use headers 
if they
+               // are set.
+               // c. If the body is not an action request and the headers 
aren't set we
+               // will use the configuration.
+               // No error is thrown by the component in the event none of the 
above
+               // conditions are met. The java es client
+               // will throw.
+
+               Message message = exchange.getIn();
+               ElasticsearchConfiguration config = getEndpoint().getConfig();
+               final String operation = resolveOperation(exchange);
+
+               // Set the index/type headers on the exchange if necessary. 
This is used
+               // for type conversion.
+               boolean configIndexName = false;
+               String indexName = message.getHeader(
+                               ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
+               if (indexName == null) {
+                       
message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME,
+                                       
getEndpoint().getConfig().getIndexName());
+                       configIndexName = true;
+               }
+
+               boolean configIndexType = false;
+               String indexType = message.getHeader(
+                               ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
+               if (indexType == null) {
+                       
message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                                       
getEndpoint().getConfig().getIndexName());
+                       configIndexType = true;
+               }
+
+               Client client = getEndpoint().getClient();
+               if 
(ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) {
+                       IndexRequest indexRequest = 
message.getBody(IndexRequest.class);
+                       
message.setBody(client.index(indexRequest).actionGet().getId());
+               } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID
+                               .equals(operation)) {
+                       GetRequest getRequest = 
message.getBody(GetRequest.class);
+                       message.setBody(client.get(getRequest));
+               } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX
+                               .equals(operation)) {
+                       BulkRequest bulkRequest = 
message.getBody(BulkRequest.class);
+                       List<String> indexedIds = new LinkedList<String>();
+                       for (BulkItemResponse response : 
client.bulk(bulkRequest)
+                                       .actionGet().getItems()) {
+                               indexedIds.add(response.getId());
+                       }
+                       log.debug("List of successfully indexed document ids : 
{}",
+                                       indexedIds);
+                       message.setBody(indexedIds);
+               } else if (ElasticsearchConfiguration.OPERATION_DELETE
+                               .equals(operation)) {
+                       DeleteRequest deleteRequest = 
message.getBody(DeleteRequest.class);
+                       
message.setBody(client.delete(deleteRequest).actionGet());
+               } else {
+                       throw new IllegalArgumentException(
+                                       
ElasticsearchConfiguration.PARAM_OPERATION + " value '"
+                                                       + operation + "' is not 
supported");
+               }
+
+               // If we set params via the configuration on this exchange, 
remove them
+               // now. This preserves legacy behavior for this component and 
enables a
+               // use case where one message can be sent to multiple 
elasticsearch
+               // endpoints where the user is relying on the endpoint 
configuration
+               // (index/type) rather than header values. If we do not clear 
this out
+               // sending the same message (index request, for example) to 
multiple
+               // elasticsearch endpoints would have the effect overriding any
+               // subsequent endpoint index/type with the first endpoint 
index/type.
+               if (configIndexName) {
+                       
message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME);
+               }
+
+               if (configIndexType) {
+                       
message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
new file mode 100644
index 0000000..f32fb0a
--- /dev/null
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/aggregation/BulkRequestAggregationStrategy.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.aggregation;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+
+/**
+ * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}.
+ */
+public class BulkRequestAggregationStrategy implements AggregationStrategy
+{
+   /////////////////////////////////////////////////////////////////////////
+   // Public methods
+   /////////////////////////////////////////////////////////////////////////
+
+   @Override
+   public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
+   {
+      // Don't use getBody(Class<T>) here as we don't want to coerce the body 
type using a type converter.
+      Object objBody = newExchange.getIn().getBody();
+      if (!(objBody instanceof ActionRequest))
+         throw new RuntimeCamelException("Invalid body type for elastisearch 
bulk request aggregation strategy: " +
+            objBody.getClass().getName());
+
+      ActionRequest newBody = (ActionRequest)objBody;
+      BulkRequest request = null;
+      if (oldExchange == null)
+      {
+         request = new BulkRequest();
+         request.add(newBody);
+         newExchange.getIn().setBody(request);
+         return newExchange;
+      }
+      else
+      {
+         request = oldExchange.getIn().getBody(BulkRequest.class);
+         request.add(newBody);
+         return oldExchange;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
new file mode 100644
index 0000000..2ad0d92
--- /dev/null
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.converter;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+@Converter
+public class ElasticsearchActionRequestConverter {
+
+       // Index requests
+       private static IndexRequest createIndexRequest(Object document,
+                       Exchange exchange) {
+               IndexRequest indexRequest = new IndexRequest();
+               if (document instanceof byte[]) {
+                       indexRequest.source((byte[]) document);
+               } else if (document instanceof Map) {
+                       indexRequest.source((Map<String, Object>) document);
+               } else if (document instanceof String) {
+                       indexRequest.source((String) document);
+               } else if (document instanceof XContentBuilder) {
+                       indexRequest.source((XContentBuilder) document);
+               } else {
+                       return null;
+               }
+
+               return indexRequest.index(
+                               exchange.getIn().getHeader(
+                                               
ElasticsearchConfiguration.PARAM_INDEX_NAME,
+                                               String.class)).type(
+                               exchange.getIn().getHeader(
+                                               
ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                                               String.class));
+       }
+
+       @Converter
+       public static IndexRequest toIndexRequest(Object document, Exchange 
exchange) {
+               if (document == null)
+                       return null;
+
+               return createIndexRequest(document, exchange).id(
+                               exchange.getIn()
+                                               
.getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID,
+                                                               String.class));
+       }
+
+       @Converter
+       public static GetRequest toGetRequest(String id, Exchange exchange) {
+               if (id == null)
+                       return null;
+
+               return new GetRequest(exchange.getIn().getHeader(
+                               ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class))
+                               .type(exchange.getIn().getHeader(
+                                               
ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                                               String.class)).id(id);
+       }
+
+       @Converter
+       public static DeleteRequest toDeleteRequest(String id, Exchange 
exchange) {
+               if (id == null)
+                       return null;
+
+               return new DeleteRequest()
+                               .index(exchange.getIn().getHeader(
+                                               
ElasticsearchConfiguration.PARAM_INDEX_NAME,
+                                               String.class))
+                               .type(exchange.getIn().getHeader(
+                                               
ElasticsearchConfiguration.PARAM_INDEX_TYPE,
+                                               String.class)).id(id);
+       }
+
+       @Converter
+       public static BulkRequest toBulkRequest(List<Object> documents,
+                       Exchange exchange) {
+               if (documents == null)
+                       return null;
+
+               BulkRequest request = new BulkRequest();
+               for (Object document : documents) {
+                       request.add(createIndexRequest(document, exchange));
+               }
+               return request;
+       }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
 
b/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
new file mode 100644
index 0000000..53a8732
--- /dev/null
+++ 
b/components/camel-elasticsearch/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.camel.component.elasticsearch.converter.ElasticsearchActionRequestConverter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/69de2e3d/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 7bc2a2c..8398c2a 100644
--- 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -23,12 +23,20 @@ import java.util.Map;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
 
 public class ElasticsearchComponentTest extends CamelTestSupport {
 
@@ -217,9 +225,75 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
         assertNull("response source should be null", response.getSource());
     }
 
+    @Test
+    public void indexRequestBody() throws Exception {
+        // given
+        IndexRequest request = new IndexRequest("foo", "bar", "testId");
+        request.source("{\"content\": \"hello\"}");
+
+        // when
+        String documentId = template.requestBody("direct:index", request,
+                String.class);
+
+        // then
+        assertThat(documentId, equalTo("testId"));
+    }
+
+    @Test
+    public void getRequestBody() throws Exception {
+        // given
+        GetRequest request = new GetRequest("foo").type("bar");
+
+        // when
+        String documentId = template.requestBody("direct:index",
+                new IndexRequest("foo", "bar", "testId")
+                        .source("{\"content\": \"hello\"}"), String.class);
+        GetResponse response = template.requestBody("direct:get",
+                request.id(documentId), GetResponse.class);
+
+        // then
+        assertThat(response, notNullValue());
+        assertThat("hello", equalTo(response.getSourceAsMap().get("content")));
+    }
+
+    @Test
+    public void deleteRequestBody() throws Exception {
+        // given
+        DeleteRequest request = new DeleteRequest("foo").type("bar");
+
+        // when
+        String documentId = template.requestBody("direct:index",
+                new IndexRequest("foo", "bar", "testId")
+                        .source("{\"content\": \"hello\"}"), String.class);
+        DeleteResponse response = template.requestBody("direct:delete",
+                request.id(documentId), DeleteResponse.class);
+
+        // then
+        assertThat(response, notNullValue());
+        assertThat(documentId, equalTo(response.getId()));
+    }
+
+    @Test
+    public void bulkRequestBody() throws Exception {
+        // given
+        BulkRequest request = new BulkRequest();
+        request.add(new IndexRequest("foo", "bar", "baz")
+                .source("{\"content\": \"hello\"}"));
+
+        // when
+        List<String> indexedDocumentIds = template.requestBody(
+                "direct:bulk_index", request, List.class);
+
+        // then
+        assertThat(indexedDocumentIds, notNullValue());
+        assertThat(indexedDocumentIds.size(), equalTo(1));
+        assertThat(indexedDocumentIds, hasItem("baz"));
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
+            @Override
             public void configure() {
                 from("direct:start").to("elasticsearch://local");
                 
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");

Reply via email to