CAMEL-8299: Restored old behavior and added BULK as operation, to return the 
BulkResponse.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4641a45c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4641a45c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4641a45c

Branch: refs/heads/master
Commit: 4641a45c227c169452e42dbd369537760dc48712
Parents: e4d991e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Tue Feb 10 06:56:41 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Feb 10 06:56:41 2015 +0100

----------------------------------------------------------------------
 .../ElasticsearchConfiguration.java             |  3 ++-
 .../elasticsearch/ElasticsearchProducer.java    | 21 ++++++++++++++++++--
 .../ElasticsearchComponentTest.java             | 21 +++++++++++++++++++-
 3 files changed, 41 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index 22f9264..90a6361 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -34,6 +34,7 @@ public class ElasticsearchConfiguration {
 
     public static final String PARAM_OPERATION = "operation";
     public static final String OPERATION_INDEX = "INDEX";
+    public static final String OPERATION_BULK = "BULK";
     public static final String OPERATION_BULK_INDEX = "BULK_INDEX";
     public static final String OPERATION_GET_BY_ID = "GET_BY_ID";
     public static final String OPERATION_DELETE = "DELETE";
@@ -62,7 +63,7 @@ public class ElasticsearchConfiguration {
     private boolean local;
     @UriParam
     private Boolean data;
-    @UriParam
+    @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE")
     private String operation;
     @UriParam
     private String ip;

http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 75ecd19..c60a1df 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -16,9 +16,14 @@
  */
 package org.apache.camel.component.elasticsearch;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetRequest;
@@ -56,7 +61,12 @@ public class ElasticsearchProducer extends DefaultProducer {
         } else if (request instanceof GetRequest) {
             return ElasticsearchConfiguration.OPERATION_GET_BY_ID;
         } else if (request instanceof BulkRequest) {
-            return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
+            // do we want bulk or bulk_index?
+            if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) 
{
+                return ElasticsearchConfiguration.OPERATION_BULK_INDEX;
+            } else {
+                return ElasticsearchConfiguration.OPERATION_BULK;
+            }
         } else if (request instanceof DeleteRequest) {
             return ElasticsearchConfiguration.OPERATION_DELETE;
         }
@@ -108,9 +118,16 @@ public class ElasticsearchProducer extends DefaultProducer 
{
         } else if 
(ElasticsearchConfiguration.OPERATION_GET_BY_ID.equals(operation)) {
             GetRequest getRequest = message.getBody(GetRequest.class);
             message.setBody(client.get(getRequest));
-        } else if 
(ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) {
+        } else if 
(ElasticsearchConfiguration.OPERATION_BULK.equals(operation)) {
             BulkRequest bulkRequest = message.getBody(BulkRequest.class);
             message.setBody(client.bulk(bulkRequest).actionGet());
+        } else if 
(ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) {
+            BulkRequest bulkRequest = message.getBody(BulkRequest.class);
+            List<String> indexedIds = new ArrayList<String>();
+            for (BulkItemResponse response : 
client.bulk(bulkRequest).actionGet().getItems()) {
+                indexedIds.add(response.getId());
+            }
+            message.setBody(indexedIds);
         } else if 
(ElasticsearchConfiguration.OPERATION_DELETE.equals(operation)) {
             DeleteRequest deleteRequest = message.getBody(DeleteRequest.class);
             message.setBody(client.delete(deleteRequest).actionGet());

http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index 62ab6a7..88b7299 100644
--- 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetRequest;
@@ -268,7 +269,8 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
     }
 
     @Test
-    public void bulkRequestBody() throws Exception {
+    @SuppressWarnings("unchecked")
+    public void bulkIndexRequestBody() throws Exception {
         // given
         BulkRequest request = new BulkRequest();
         request.add(new IndexRequest("foo", "bar", "baz")
@@ -284,6 +286,22 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
         assertThat(indexedDocumentIds, hasItem("baz"));
     }
 
+    @Test
+    public void bulkRequestBody() throws Exception {
+        // given
+        BulkRequest request = new BulkRequest();
+        request.add(new IndexRequest("foo", "bar", "baz")
+                .source("{\"content\": \"hello\"}"));
+
+        // when
+        BulkResponse response = template.requestBody(
+                "direct:bulk", request, BulkResponse.class);
+
+        // then
+        assertThat(response, notNullValue());
+        assertEquals("baz", response.getItems()[0].getId());
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -294,6 +312,7 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
                 
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
                 
from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
                 
from("direct:bulk_index").to("elasticsearch://local?operation=BULK_INDEX&indexName=twitter&indexType=tweet");
+                
from("direct:bulk").to("elasticsearch://local?operation=BULK&indexName=twitter&indexType=tweet");
                 
//from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
                 
//from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
             }

Reply via email to