Repository: camel
Updated Branches:
  refs/heads/master 1d3682d3f -> fb7752760


Fix CAMEL-7681 : Add Bulk Index mode to Elasticsearch component
Add BULK_INDEX method
Add unit test that index 2 Map in a row


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cea796a7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cea796a7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cea796a7

Branch: refs/heads/master
Commit: cea796a78e1e5b50c6342a46c4ca2913214c7a85
Parents: 1d3682d
Author: sebbrousse <seb.brou...@gmail.com>
Authored: Mon Aug 11 23:15:52 2014 +0200
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Wed Aug 13 15:52:16 2014 +0800

----------------------------------------------------------------------
 .../ElasticsearchConfiguration.java             | 10 ++-
 .../elasticsearch/ElasticsearchProducer.java    | 83 ++++++++++++++++----
 .../ElasticsearchComponentTest.java             | 24 +++++-
 3 files changed, 96 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
index e1e8aa6..1703d50 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java
@@ -16,14 +16,15 @@
  */
 package org.apache.camel.component.elasticsearch;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.NodeBuilder;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
 @UriParams
@@ -31,6 +32,7 @@ public class ElasticsearchConfiguration {
 
     public static final String PARAM_OPERATION = "operation";
     public static final String OPERATION_INDEX = "INDEX";
+    public static final String OPERATION_BULK_INDEX = "BULK_INDEX";
     public static final String OPERATION_GET_BY_ID = "GET_BY_ID";
     public static final String OPERATION_DELETE = "DELETE";
     public static final String PARAM_INDEX_ID = "indexId";

http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
index 3df10a2..2d190b0 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java
@@ -16,13 +16,14 @@
  */
 package org.apache.camel.component.elasticsearch;
 
-import java.util.Map;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.ExpectedBodyTypeException;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
 import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -30,6 +31,10 @@ import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Represents an Elasticsearch producer.
  */
@@ -62,6 +67,8 @@ public class ElasticsearchProducer extends DefaultProducer {
             getById(client, exchange);
         } else if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_DELETE)) {
             deleteById(client, exchange);
+        } else if 
(operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_BULK_INDEX)) {
+            addToIndexUsingBulk(client, exchange);
         } else {
             throw new 
IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value 
'" + operation + "' is not supported");
         }
@@ -114,7 +121,9 @@ public class ElasticsearchProducer extends DefaultProducer {
 
         IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, 
indexType);
 
-        if (!setIndexRequestSource(exchange.getIn(), prepareIndex)) {
+        Object document = extractDocumentFromMessage(exchange.getIn());
+
+        if (!setIndexRequestSource(document, prepareIndex)) {
             throw new ExpectedBodyTypeException(exchange, 
XContentBuilder.class);
         }
         ListenableActionFuture<IndexResponse> future = prepareIndex.execute();
@@ -122,10 +131,45 @@ public class ElasticsearchProducer extends 
DefaultProducer {
         exchange.getIn().setBody(response.getId());
     }
 
-    @SuppressWarnings("unchecked")
-    private boolean setIndexRequestSource(Message msg, IndexRequestBuilder 
builder) {
+    public void addToIndexUsingBulk(Client client, Exchange exchange) {
+        String indexName = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, 
String.class);
+        if (indexName == null) {
+            indexName = getEndpoint().getConfig().getIndexName();
+        }
+
+        String indexType = 
exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, 
String.class);
+        if (indexType == null) {
+            indexType = getEndpoint().getConfig().getIndexType();
+        }
+
+        log.debug("Preparing Bulk Request");
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+        List<?> body = exchange.getIn().getBody(List.class);;
+
+        for (Object document : body) {
+            IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, 
indexType);
+            log.trace("Indexing document : {}", document);
+            if (!setIndexRequestSource(document, prepareIndex)) {
+                throw new ExpectedBodyTypeException(exchange, 
XContentBuilder.class);
+            }
+            bulkRequest.add(prepareIndex);
+        }
+
+        ListenableActionFuture<BulkResponse> future = bulkRequest.execute();
+        BulkResponse bulkResponse = future.actionGet();
+
+        List<String> indexedIds = new LinkedList<String>();
+        for (BulkItemResponse response : bulkResponse.getItems()) {
+            indexedIds.add(response.getId());
+        }
+        log.debug("List of successfully indexed document ids : {}", 
indexedIds);
+        exchange.getIn().setBody(indexedIds);
+    }
+
+
+    private Object extractDocumentFromMessage(Message msg) {
         Object body = null;
-        boolean converted = false;
 
         // order is important
         Class<?>[] types = new Class[] {XContentBuilder.class, Map.class, 
byte[].class, String.class};
@@ -135,16 +179,25 @@ public class ElasticsearchProducer extends 
DefaultProducer {
             body = msg.getBody(type);
         }
 
-        if (body != null) {
+        return body;
+
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private boolean setIndexRequestSource(Object document, IndexRequestBuilder 
builder) {
+        boolean converted = false;
+
+        if (document != null) {
             converted = true;
-            if (body instanceof byte[]) {
-                builder.setSource((byte[])body);
-            } else if (body instanceof Map) {
-                builder.setSource((Map<String, Object>) body);
-            } else if (body instanceof String) {
-                builder.setSource((String)body);
-            } else if (body instanceof XContentBuilder) {
-                builder.setSource((XContentBuilder)body);
+            if (document instanceof byte[]) {
+                builder.setSource((byte[])document);
+            } else if (document instanceof Map) {
+                builder.setSource((Map<String, Object>) document);
+            } else if (document instanceof String) {
+                builder.setSource((String)document);
+            } else if (document instanceof XContentBuilder) {
+                builder.setSource((XContentBuilder)document);
             } else {
                 converted = false;
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
index cab6451..d1e9945 100644
--- 
a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
+++ 
b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.elasticsearch;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.elasticsearch.action.delete.DeleteResponse;
@@ -26,6 +24,11 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class ElasticsearchComponentTest extends CamelTestSupport {
 
     @Override
@@ -50,6 +53,22 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
     }
 
     @Test
+    public void testBulkIndex() throws Exception {
+        List<Map<String, String>> documents = new ArrayList<Map<String, 
String>>();
+        Map<String, String> document1 = new HashMap<String, String>();
+        document1.put("content1", "test1");
+        Map<String, String> document2 = new HashMap<String, String>();
+        document2.put("content2", "test2");
+
+        documents.add(document1);
+        documents.add(document2);
+
+        List indexIds = template.requestBody("direct:bulk_index", documents, 
List.class);
+        assertNotNull("indexIds should be set", indexIds);
+        assertCollectionSize("Indexed documents should match the size of 
documents", indexIds, documents.size());
+    }
+
+    @Test
     public void testGet() throws Exception {
         //first, INDEX a value
         Map<String, String> map = new HashMap<String, String>();
@@ -189,6 +208,7 @@ public class ElasticsearchComponentTest extends 
CamelTestSupport {
                 
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
                 
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
                 
from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet");
+                
from("direct:bulk_index").to("elasticsearch://local?operation=BULK_INDEX&indexName=twitter&indexType=tweet");
                 
//from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost");
                 
//from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300");
             }

Reply via email to