CAMEL-8299: Restored old behavior and added BULK as operation, to return the BulkResponse.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4641a45c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4641a45c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4641a45c Branch: refs/heads/master Commit: 4641a45c227c169452e42dbd369537760dc48712 Parents: e4d991e Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Feb 10 06:56:41 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Feb 10 06:56:41 2015 +0100 ---------------------------------------------------------------------- .../ElasticsearchConfiguration.java | 3 ++- .../elasticsearch/ElasticsearchProducer.java | 21 ++++++++++++++++++-- .../ElasticsearchComponentTest.java | 21 +++++++++++++++++++- 3 files changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index 22f9264..90a6361 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -34,6 +34,7 @@ public class ElasticsearchConfiguration { public static final String PARAM_OPERATION = "operation"; public static final String OPERATION_INDEX = "INDEX"; + public static final String OPERATION_BULK = "BULK"; public static final String OPERATION_BULK_INDEX = "BULK_INDEX"; public static final String OPERATION_GET_BY_ID = "GET_BY_ID"; public static final String OPERATION_DELETE = "DELETE"; @@ -62,7 +63,7 @@ public class ElasticsearchConfiguration { private boolean local; @UriParam private Boolean data; - @UriParam + @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") private String operation; @UriParam private String ip; http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 75ecd19..c60a1df 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -16,9 +16,14 @@ */ package org.apache.camel.component.elasticsearch; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; @@ -56,7 +61,12 @@ public class ElasticsearchProducer extends DefaultProducer { } else if (request instanceof GetRequest) { return ElasticsearchConfiguration.OPERATION_GET_BY_ID; } else if (request instanceof BulkRequest) { - return ElasticsearchConfiguration.OPERATION_BULK_INDEX; + // do we want bulk or bulk_index? + if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) { + return ElasticsearchConfiguration.OPERATION_BULK_INDEX; + } else { + return ElasticsearchConfiguration.OPERATION_BULK; + } } else if (request instanceof DeleteRequest) { return ElasticsearchConfiguration.OPERATION_DELETE; } @@ -108,9 +118,16 @@ public class ElasticsearchProducer extends DefaultProducer { } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID.equals(operation)) { GetRequest getRequest = message.getBody(GetRequest.class); message.setBody(client.get(getRequest)); - } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) { + } else if (ElasticsearchConfiguration.OPERATION_BULK.equals(operation)) { BulkRequest bulkRequest = message.getBody(BulkRequest.class); message.setBody(client.bulk(bulkRequest).actionGet()); + } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) { + BulkRequest bulkRequest = message.getBody(BulkRequest.class); + List<String> indexedIds = new ArrayList<String>(); + for (BulkItemResponse response : client.bulk(bulkRequest).actionGet().getItems()) { + indexedIds.add(response.getId()); + } + message.setBody(indexedIds); } else if (ElasticsearchConfiguration.OPERATION_DELETE.equals(operation)) { DeleteRequest deleteRequest = message.getBody(DeleteRequest.class); message.setBody(client.delete(deleteRequest).actionGet()); http://git-wip-us.apache.org/repos/asf/camel/blob/4641a45c/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 62ab6a7..88b7299 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; @@ -268,7 +269,8 @@ public class ElasticsearchComponentTest extends CamelTestSupport { } @Test - public void bulkRequestBody() throws Exception { + @SuppressWarnings("unchecked") + public void bulkIndexRequestBody() throws Exception { // given BulkRequest request = new BulkRequest(); request.add(new IndexRequest("foo", "bar", "baz") @@ -284,6 +286,22 @@ public class ElasticsearchComponentTest extends CamelTestSupport { assertThat(indexedDocumentIds, hasItem("baz")); } + @Test + public void bulkRequestBody() throws Exception { + // given + BulkRequest request = new BulkRequest(); + request.add(new IndexRequest("foo", "bar", "baz") + .source("{\"content\": \"hello\"}")); + + // when + BulkResponse response = template.requestBody( + "direct:bulk", request, BulkResponse.class); + + // then + assertThat(response, notNullValue()); + assertEquals("baz", response.getItems()[0].getId()); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -294,6 +312,7 @@ public class ElasticsearchComponentTest extends CamelTestSupport { from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet"); from("direct:delete").to("elasticsearch://local?operation=DELETE&indexName=twitter&indexType=tweet"); from("direct:bulk_index").to("elasticsearch://local?operation=BULK_INDEX&indexName=twitter&indexType=tweet"); + from("direct:bulk").to("elasticsearch://local?operation=BULK&indexName=twitter&indexType=tweet"); //from("direct:indexWithIp").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost"); //from("direct:indexWithIpAndPort").to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet&ip=localhost&port=9300"); }