Repository: camel Updated Branches: refs/heads/master 1d3682d3f -> fb7752760
Fix CAMEL-7681 : Add Bulk Index mode to Elasticsearch component Add BULK_INDEX method Add unit test that index 2 Map in a row Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cea796a7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cea796a7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cea796a7 Branch: refs/heads/master Commit: cea796a78e1e5b50c6342a46c4ca2913214c7a85 Parents: 1d3682d Author: sebbrousse <seb.brou...@gmail.com> Authored: Mon Aug 11 23:15:52 2014 +0200 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Aug 13 15:52:16 2014 +0800 ---------------------------------------------------------------------- .../ElasticsearchConfiguration.java | 10 ++- .../elasticsearch/ElasticsearchProducer.java | 83 ++++++++++++++++---- .../ElasticsearchComponentTest.java | 24 +++++- 3 files changed, 96 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index e1e8aa6..1703d50 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -16,14 +16,15 @@ */ package org.apache.camel.component.elasticsearch; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; - import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + import static org.elasticsearch.node.NodeBuilder.nodeBuilder; @UriParams @@ -31,6 +32,7 @@ public class ElasticsearchConfiguration { public static final String PARAM_OPERATION = "operation"; public static final String OPERATION_INDEX = "INDEX"; + public static final String OPERATION_BULK_INDEX = "BULK_INDEX"; public static final String OPERATION_GET_BY_ID = "GET_BY_ID"; public static final String OPERATION_DELETE = "DELETE"; public static final String PARAM_INDEX_ID = "indexId"; http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 3df10a2..2d190b0 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -16,13 +16,14 @@ */ package org.apache.camel.component.elasticsearch; -import java.util.Map; - import org.apache.camel.Exchange; import org.apache.camel.ExpectedBodyTypeException; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -30,6 +31,10 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentBuilder; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + /** * Represents an Elasticsearch producer. */ @@ -62,6 +67,8 @@ public class ElasticsearchProducer extends DefaultProducer { getById(client, exchange); } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_DELETE)) { deleteById(client, exchange); + } else if (operation.equalsIgnoreCase(ElasticsearchConfiguration.OPERATION_BULK_INDEX)) { + addToIndexUsingBulk(client, exchange); } else { throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported"); } @@ -114,7 +121,9 @@ public class ElasticsearchProducer extends DefaultProducer { IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType); - if (!setIndexRequestSource(exchange.getIn(), prepareIndex)) { + Object document = extractDocumentFromMessage(exchange.getIn()); + + if (!setIndexRequestSource(document, prepareIndex)) { throw new ExpectedBodyTypeException(exchange, XContentBuilder.class); } ListenableActionFuture<IndexResponse> future = prepareIndex.execute(); @@ -122,10 +131,45 @@ public class ElasticsearchProducer extends DefaultProducer { exchange.getIn().setBody(response.getId()); } - @SuppressWarnings("unchecked") - private boolean setIndexRequestSource(Message msg, IndexRequestBuilder builder) { + public void addToIndexUsingBulk(Client client, Exchange exchange) { + String indexName = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); + if (indexName == null) { + indexName = getEndpoint().getConfig().getIndexName(); + } + + String indexType = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); + if (indexType == null) { + indexType = getEndpoint().getConfig().getIndexType(); + } + + log.debug("Preparing Bulk Request"); + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + List<?> body = exchange.getIn().getBody(List.class);; + + for (Object document : body) { + IndexRequestBuilder prepareIndex = client.prepareIndex(indexName, indexType); + log.trace("Indexing document : {}", document); + if (!setIndexRequestSource(document, prepareIndex)) { + throw new ExpectedBodyTypeException(exchange, XContentBuilder.class); + } + bulkRequest.add(prepareIndex); + } + + ListenableActionFuture<BulkResponse> future = bulkRequest.execute(); + BulkResponse bulkResponse = future.actionGet(); + + List<String> indexedIds = new LinkedList<String>(); + for (BulkItemResponse response : bulkResponse.getItems()) { + indexedIds.add(response.getId()); + } + log.debug("List of successfully indexed document ids : {}", indexedIds); + exchange.getIn().setBody(indexedIds); + } + + + private Object extractDocumentFromMessage(Message msg) { Object body = null; - boolean converted = false; // order is important Class<?>[] types = new Class[] {XContentBuilder.class, Map.class, byte[].class, String.class}; @@ -135,16 +179,25 @@ public class ElasticsearchProducer extends DefaultProducer { body = msg.getBody(type); } - if (body != null) { + return body; + + } + + + @SuppressWarnings("unchecked") + private boolean setIndexRequestSource(Object document, IndexRequestBuilder builder) { + boolean converted = false; + + if (document != null) { converted = true; - if (body instanceof byte[]) { - builder.setSource((byte[])body); - } else if (body instanceof Map) { - builder.setSource((Map<String, Object>) body); - } else if (body instanceof String) { - builder.setSource((String)body); - } else if (body instanceof XContentBuilder) { - builder.setSource((XContentBuilder)body); + if (document instanceof byte[]) { + builder.setSource((byte[])document); + } else if (document instanceof Map) { + builder.setSource((Map<String, Object>) document); + } else if (document instanceof String) { + builder.setSource((String)document); + } else if (document instanceof XContentBuilder) { + builder.setSource((XContentBuilder)document); } else { converted = false; } http://git-wip-us.apache.org/repos/asf/camel/blob/cea796a7/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index cab6451..d1e9945 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.elasticsearch; -import java.util.HashMap; -import java.util.Map; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.elasticsearch.action.delete.DeleteResponse; @@ -26,6 +24,11 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class ElasticsearchComponentTest extends CamelTestSupport { @Override @@ -50,6 +53,22 @@ public class ElasticsearchComponentTest extends CamelTestSupport { } @Test + public void testBulkIndex() throws Exception { + List<Map<String, String>> documents = new ArrayList<Map<String, String>>(); + Map<String, String> document1 = new HashMap<String, String>(); + document1.put("content1", "test1"); + Map<String, String> document2 = new HashMap<String, String>(); + document2.put("content2", "test2"); + + documents.add(document1); + documents.add(document2); + + List indexIds = template.requestBody("direct:bulk_index", documents, List.class); + assertNotNull("indexIds should be set", indexIds); + assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size()); + } + + @Test public void testGet() throws Exception { //first, INDEX a value Map<String, String> map = new HashMap<String, String>(); @@ -189,6 +208,7 @@ public class ElasticsearchComponentTest extends CamelTestSupport { from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet"); + from("direct:bulk_index").to("elasticsearch://local?operation=BULK_INDEX&indexName=twitter&indexType=tweet"); //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost"); //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300"); }