Updated Branches: refs/heads/master a6b292489 -> bdad710a3
CAMEL-6507 Add aggregat ability to camel_mongodb with thanks to Pierre-Alban Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bdad710a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bdad710a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bdad710a Branch: refs/heads/master Commit: bdad710a3aa8a14bd5af9d13b60d60bd4545f7ec Parents: a6b2924 Author: Willem Jiang <ningji...@apache.org> Authored: Thu Jul 4 10:31:49 2013 +0800 Committer: Willem Jiang <ningji...@apache.org> Committed: Thu Jul 4 10:33:39 2013 +0800 ---------------------------------------------------------------------- .../component/mongodb/MongoDbOperation.java | 3 ++ .../component/mongodb/MongoDbProducer.java | 44 +++++++++++++++++++- .../mongodb/MongoDbOperationsTest.java | 21 ++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bdad710a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java index 8d11fde..710c119 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java @@ -33,6 +33,9 @@ public enum MongoDbOperation { // delete operations remove, + //aggregat + aggregat, + // others getDbStats, getColStats, http://git-wip-us.apache.org/repos/asf/camel/blob/bdad710a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java index ea04abe..7cd4022 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java @@ -19,6 +19,8 @@ package org.apache.camel.component.mongodb; import java.util.ArrayList; import java.util.List; +import com.mongodb.AggregationOutput; +import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; import com.mongodb.DB; @@ -113,7 +115,11 @@ public class MongoDbProducer extends DefaultProducer { case remove: doRemove(exchange); break; - + + case aggregat: + doAggregat(exchange); + break; + case getDbStats: doGetStats(exchange, MongoDbOperation.getDbStats); break; @@ -339,6 +345,42 @@ public class MongoDbProducer extends DefaultProducer { resultMessage.setBody(answer); } + /** + * All headers except collection and database are non available for this + * operation. + * + * @param exchange + * @throws Exception + */ + protected void doAggregat(Exchange exchange) throws Exception { + DBCollection dbCol = calculateCollection(exchange); + DBObject query = exchange.getIn().getMandatoryBody(DBObject.class); + + // Impossible with java driver to get the batch size and number to skip + Iterable<DBObject> dbIterator = null; + try { + AggregationOutput agregationResult = null; + + // Allow body to be a pipeline + // @see http://docs.mongodb.org/manual/core/aggregation/ + if (query instanceof BasicDBList) { + BasicDBList queryList = (BasicDBList)query; + agregationResult = dbCol.aggregate((DBObject)queryList.get(0), (BasicDBObject[])queryList + .subList(1, queryList.size()).toArray(new BasicDBObject[queryList.size() - 1])); + } else { + agregationResult = dbCol.aggregate(query); + } + + dbIterator = agregationResult.results(); + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.aggregat); + resultMessage.setBody(dbIterator); + + // Mongo Driver does not allow to read size and to paginate aggregate result + } catch (Exception e) { + // rethrow the exception + throw e; + } + } // --------- Convenience methods ----------------------- private DBCollection calculateCollection(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/bdad710a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java index 32be1a3..c3bed5e 100644 --- a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java +++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java @@ -17,12 +17,14 @@ package org.apache.camel.component.mongodb; import java.util.Formatter; +import java.util.List; import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.WriteResult; import com.mongodb.util.JSON; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.junit.Test; @@ -157,6 +159,24 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest { } @Test + public void testAgregat() throws Exception { + // Test that the collection has 0 documents in it + assertEquals(0, testCollection.count()); + pumpDataIntoTestCollection(); + + // Repeat ten times, obtain 10 batches of 100 results each time + Object result = template + .requestBody("direct:aggregat", + "[{ $match : {$or : [{\"scientist\" : \"Darwin\"},{\"scientist\" : \"Einstein\"}]}},{ $group: { _id: \"$scientist\", count: { $sum: 1 }} } ]"); + assertTrue("Result is not of type List", result instanceof List); + + @SuppressWarnings("unchecked") + List<DBObject> resultList = (List<DBObject>)result; + assertListSize("Result does not contain 2 elements", resultList, 2); + // TODO Add more asserts + } + + @Test public void testDbStats() throws Exception { assertEquals(0, testCollection.count()); Object result = template.requestBody("direct:getDbStats", "irrelevantBody"); @@ -210,6 +230,7 @@ public class MongoDbOperationsTest extends AbstractMongoDbTest { from("direct:save").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=save&writeConcern=SAFE"); from("direct:update").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=update&writeConcern=SAFE"); from("direct:remove").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=remove&writeConcern=SAFE"); + from("direct:aggregat").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=aggregat&writeConcern=SAFE"); from("direct:getDbStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getDbStats"); from("direct:getColStats").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=getColStats");