Updated Branches: refs/heads/camel-2.10.x 180aef79f -> a2ac18a1d
CAMEL-6507 Add aggregate ability to camel-mongodb, with thanks to Pierre-Alban. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1bd4d45e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1bd4d45e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1bd4d45e Branch: refs/heads/camel-2.10.x Commit: 1bd4d45eba56ad76ac1041f976ebde6a1587520a Parents: 180aef7 Author: Willem Jiang <ningji...@apache.org> Authored: Thu Jul 4 10:31:49 2013 +0800 Committer: RauÌl Kripalani <ra...@apache.org> Committed: Sun Jul 7 23:18:09 2013 +0100 ---------------------------------------------------------------------- .../component/mongodb/MongoDbOperation.java | 3 ++ .../component/mongodb/MongoDbProducer.java | 44 +++++++++++++++++++- .../mongodb/MongoDbOperationsTest.java | 23 +++++++++- 3 files changed, 68 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1bd4d45e/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java index 8d11fde..bb6ee6a 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java @@ -33,6 +33,9 @@ public enum MongoDbOperation { // delete operations remove, + //aggregat + aggregate, + // others getDbStats, getColStats, http://git-wip-us.apache.org/repos/asf/camel/blob/1bd4d45e/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java index ea04abe..166c62b 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java @@ -19,6 +19,8 @@ package org.apache.camel.component.mongodb; import java.util.ArrayList; import java.util.List; +import com.mongodb.AggregationOutput; +import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; import com.mongodb.DB; @@ -113,7 +115,11 @@ public class MongoDbProducer extends DefaultProducer { case remove: doRemove(exchange); break; - + + case aggregate: + doAggregate(exchange); + break; + case getDbStats: doGetStats(exchange, MongoDbOperation.getDbStats); break; @@ -339,6 +345,42 @@ public class MongoDbProducer extends DefaultProducer { resultMessage.setBody(answer); } + /** + * All headers except collection and database are non available for this + * operation. + * + * @param exchange + * @throws Exception + */ + protected void doAggregate(Exchange exchange) throws Exception { + DBCollection dbCol = calculateCollection(exchange); + DBObject query = exchange.getIn().getMandatoryBody(DBObject.class); + + // Impossible with java driver to get the batch size and number to skip + Iterable<DBObject> dbIterator = null; + try { + AggregationOutput aggregationResult = null; + + // Allow body to be a pipeline + // @see http://docs.mongodb.org/manual/core/aggregation/ + if (query instanceof BasicDBList) { + BasicDBList queryList = (BasicDBList)query; + aggregationResult = dbCol.aggregate((DBObject)queryList.get(0), (BasicDBObject[])queryList + .subList(1, queryList.size()).toArray(new BasicDBObject[queryList.size() - 1])); + } else { + aggregationResult = dbCol.aggregate(query); + } + + dbIterator = aggregationResult.results(); + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.aggregate); + resultMessage.setBody(dbIterator); + + // Mongo Driver does not allow to read size and to paginate aggregate result + } catch (Exception e) { + // rethrow the exception + throw e; + } + } // --------- Convenience methods ----------------------- private DBCollection calculateCollection(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/1bd4d45e/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java index cb09dd8..87fc9ce 100644 --- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java +++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java @@ -17,14 +17,16 @@ package org.apache.camel.component.mongodb; import java.util.Formatter; +import java.util.List; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.WriteResult; import com.mongodb.util.JSON; -import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; + import org.junit.Test; public class MongoDbOperationsTest extends AbstractMongoDbTest { @@ -157,6 +159,24 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest { } @Test + public void testAggregate() throws Exception { + // Test that the collection has 0 documents in it + assertEquals(0, testCollection.count()); + pumpDataIntoTestCollection(); + + // Repeat ten times, obtain 10 batches of 100 results each time + Object result = template + .requestBody("direct:aggregate", + "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]"); + assertTrue("Result is not of type List", result instanceof List); + + @SuppressWarnings("unchecked") + List<DBObject> resultList = (List<DBObject>)result; + assertListSize("Result does not contain 2 elements", resultList, 2); + // TODO Add more asserts + } + + @Test public void testDbStats() throws Exception { assertEquals(0, testCollection.count()); Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); @@ -210,6 +230,7 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest { from("direct:save").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeConcern=SAFE"); from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE"); from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE"); + from("direct:aggregate").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregate&writeConcern=SAFE"); from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats"); from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");