CAMEL-6000: Add collectionIndex option to camel-mongodb. Thanks to Pierre-Yves Nicolas for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5f42992 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5f42992 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5f42992 Branch: refs/heads/master Commit: d5f429924c16418a9ceb672d40f01c2743b48af5 Parents: 9ecc122 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Aug 7 16:43:24 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Aug 7 18:33:33 2013 +0200 ---------------------------------------------------------------------- .../component/mongodb/MongoDbConstants.java | 10 +- .../component/mongodb/MongoDbEndpoint.java | 280 +++++++++++++------ .../component/mongodb/MongoDbProducer.java | 146 +++++----- .../component/mongodb/MongoDbIndexTest.java | 193 +++++++++++++ 4 files changed, 476 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java index 784a4e9..3eff7c5 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java @@ -17,8 +17,8 @@ package org.apache.camel.component.mongodb; public final class MongoDbConstants { - - public static final String OPERATION_HEADER = "CamelMongoDbOperation"; + + public static final String OPERATION_HEADER = "CamelMongoDbOperation"; public static final String RESULT_TOTAL_SIZE = "CamelMongoDbResultTotalSize"; public static final String RESULT_PAGE_SIZE = "CamelMongoDbResultPageSize"; public static final String FIELDS_FILTER = "CamelMongoDbFieldsFilter"; @@ -32,11 +32,13 @@ public final class MongoDbConstants { public static final String SORT_BY = "CamelMongoDbSortBy"; public static final String DATABASE = "CamelMongoDbDatabase"; public static final String COLLECTION = "CamelMongoDbCollection"; + public static final String COLLECTION_INDEX = "CamelMongoDbCollectionIndex"; public static final String WRITECONCERN = "CamelMongoDbWriteConcern"; public static final String LIMIT = "CamelMongoDbLimit"; public static final String FROM_TAILABLE = "CamelMongoDbTailable"; public static final String WRITERESULT = "CamelMongoWriteResult"; - private MongoDbConstants() { } - + private MongoDbConstants() { + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java index 97d410e..c216ccb 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java @@ -16,6 +16,13 @@ */ package org.apache.camel.component.mongodb; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; @@ -23,7 +30,6 @@ import com.mongodb.Mongo; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; - import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -33,14 +39,14 @@ import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.DefaultMessage; import org.apache.camel.util.ObjectHelper; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Represents a MongoDb endpoint. - * It is responsible for creating {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances. - * It accepts a number of options to customise the behaviour of consumers and producers. + * Represents a MongoDb endpoint. It is responsible for creating + * {@link MongoDbProducer} and {@link MongoDbTailableCursorConsumer} instances. + * It accepts a number of options to customise the behaviour of consumers and + * producers. */ public class MongoDbEndpoint extends DefaultEndpoint { @@ -48,6 +54,7 @@ public class MongoDbEndpoint extends DefaultEndpoint { private Mongo mongoConnection; private String database; private String collection; + private String collectionIndex; private MongoDbOperation operation; private boolean createCollection = true; private boolean invokeGetLastError; // = false @@ -60,16 +67,16 @@ public class MongoDbEndpoint extends DefaultEndpoint { private MongoDbConsumerType consumerType; private long cursorRegenerationDelay = 1000L; private String tailTrackIncreasingField; - + // persitent tail tracking private boolean persistentTailTracking; // = false; private String persistentId; private String tailTrackDb; private String tailTrackCollection; private String tailTrackField; - + private MongoDbTailTrackingConfig tailTrackingConfig; - + private DBCollection dbCollection; private DB db; @@ -88,7 +95,7 @@ public class MongoDbEndpoint extends DefaultEndpoint { } // ======= Implementation methods ===================================== - + public Producer createProducer() throws Exception { validateOptions('P'); initializeConnection(); @@ -100,12 +107,12 @@ public class MongoDbEndpoint extends DefaultEndpoint { // we never create the collection createCollection = false; initializeConnection(); - + // select right consumer type if (consumerType == null) { consumerType = MongoDbConsumerType.tailable; } - + Consumer consumer; if (consumerType == MongoDbConsumerType.tailable) { consumer = new MongoDbTailableCursorConsumer(this, processor); @@ -121,17 +128,17 @@ public class MongoDbEndpoint extends DefaultEndpoint { // make our best effort to validate, options with defaults are checked against their defaults, which is not always a guarantee that // they haven't been explicitly set, but it is enough if (role == 'P') { - if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) + if (!ObjectHelper.isEmpty(consumerType) || persistentTailTracking || !ObjectHelper.isEmpty(tailTrackDb) || !ObjectHelper.isEmpty(tailTrackCollection) || !ObjectHelper.isEmpty(tailTrackField) || cursorRegenerationDelay != 1000L) { throw new IllegalArgumentException("consumerType, tailTracking, cursorRegenerationDelay options cannot appear on a producer endpoint"); } } else if (role == 'C') { - if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null + if (!ObjectHelper.isEmpty(operation) || !ObjectHelper.isEmpty(writeConcern) || writeConcernRef != null || readPreference != null || dynamicity || invokeGetLastError) { - throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError " + throw new IllegalArgumentException("operation, writeConcern, writeConcernRef, readPreference, dynamicity, invokeGetLastError " + "options cannot appear on a consumer endpoint"); } - + if (consumerType == MongoDbConsumerType.tailable) { if (tailTrackIncreasingField == null) { throw new IllegalArgumentException("tailTrackIncreasingField option must be set for tailable cursor MongoDB consumer endpoint"); @@ -140,7 +147,7 @@ public class MongoDbEndpoint extends DefaultEndpoint { throw new IllegalArgumentException("persistentId is compulsory for persistent tail tracking"); } } - + } else { throw new IllegalArgumentException("Unknown endpoint role"); } @@ -149,9 +156,11 @@ public class MongoDbEndpoint extends DefaultEndpoint { public boolean isSingleton() { return true; } - + /** - * Initialises the MongoDB connection using the Mongo object provided to the endpoint + * Initialises the MongoDB connection using the Mongo object provided to the + * endpoint + * * @throws CamelMongoDbException */ public void initializeConnection() throws CamelMongoDbException { @@ -167,20 +176,66 @@ public class MongoDbEndpoint extends DefaultEndpoint { throw new CamelMongoDbException("Could not initialise MongoDbComponent. Collection " + collection + " and createCollection is false."); } dbCollection = db.getCollection(collection); - - LOG.info("MongoDb component initialised and endpoint bound to MongoDB collection with the following paramters. Address list: {}, Db: {}, Collection: {}", - new Object[] {mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()}); + + LOG.debug("MongoDb component initialised and endpoint bound to MongoDB collection with the following parameters. Address list: {}, Db: {}, Collection: {}", + new Object[]{mongoConnection.getAllAddress().toString(), db.getName(), dbCollection.getName()}); + + try { + if (ObjectHelper.isNotEmpty(collectionIndex)) { + ensureIndex(dbCollection, createIndex()); + } + } catch (Exception e) { + throw new CamelMongoDbException("Error creating index", e); + } + } + + /** + * Add Index + * + * @param collection + */ + public void ensureIndex(DBCollection collection, List<DBObject> dynamicIndex) { + collection.dropIndexes(); + if (dynamicIndex != null && !dynamicIndex.isEmpty()) { + for (DBObject index : dynamicIndex) { + LOG.debug("create BDObject Index {}", index); + collection.ensureIndex(index); + } + } + } + + /** + * Create technical list index + * + * @return technical list index + */ + @SuppressWarnings("unchecked") + public List<DBObject> createIndex() throws Exception { + List<DBObject> indexList = new ArrayList<DBObject>(); + + if (ObjectHelper.isNotEmpty(collectionIndex)) { + HashMap<String, String> indexMap = new ObjectMapper().readValue(collectionIndex, HashMap.class); + + for (Map.Entry<String, String> set : indexMap.entrySet()) { + DBObject index = new BasicDBObject(); + index.put(set.getKey(), set.getValue()); + + indexList.add(index); + } + } + return indexList; } /** - * Applies validation logic specific to this endpoint type. If everything succeeds, continues initialization + * Applies validation logic specific to this endpoint type. If everything + * succeeds, continues initialization */ @Override protected void doStart() throws Exception { if (writeConcern != null && writeConcernRef != null) { - LOG.error("Cannot set both writeConcern and writeConcernRef at the same time. Respective values: {}, {}. " - + "Aborting initialization.", new Object[] {writeConcern, writeConcernRef}); - throw new IllegalArgumentException("Cannot set both writeConcern and writeConcernRef at the same time on MongoDB endpoint"); + String msg = "Cannot set both writeConcern and writeConcernRef at the same time. Respective values: " + writeConcern + + ", " + writeConcernRef + ". Aborting initialization."; + throw new IllegalArgumentException(msg); } setWriteReadOptionsOnConnection(); @@ -193,12 +248,12 @@ public class MongoDbEndpoint extends DefaultEndpoint { message.setHeader(MongoDbConstants.DATABASE, database); message.setHeader(MongoDbConstants.COLLECTION, collection); message.setHeader(MongoDbConstants.FROM_TAILABLE, true); - + message.setBody(dbObj); exchange.setIn(message); return exchange; } - + private void setWriteReadOptionsOnConnection() { // Set the WriteConcern if (writeConcern != null) { @@ -206,18 +261,19 @@ public class MongoDbEndpoint extends DefaultEndpoint { } else if (writeConcernRef != null) { mongoConnection.setWriteConcern(writeConcernRef); } - + // Set the ReadPreference if (readPreference != null) { mongoConnection.setReadPreference(readPreference); } } - - - // ======= Getters and setters =============================================== - + + // ======= Getters and setters + // =============================================== + /** * Sets the name of the MongoDB collection to bind to this endpoint + * * @param collection collection name */ public void setCollection(String collection) { @@ -229,7 +285,21 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Sets the operation this endpoint will execute against MongoDB. For possible values, see {@link MongoDbOperation}. + * Sets the collection index (JSON FORMAT : { "field1" : "order", "field2" : + * "order"}) + */ + public void setCollectionIndex(String collectionIndex) { + this.collectionIndex = collectionIndex; + } + + public String getCollectionIndex() { + return collectionIndex; + } + + /** + * Sets the operation this endpoint will execute against MongoDB. For + * possible values, see {@link MongoDbOperation}. + * * @param operation name of the operation as per catalogued values * @throws CamelMongoDbException */ @@ -247,6 +317,7 @@ public class MongoDbEndpoint extends DefaultEndpoint { /** * Sets the name of the MongoDB database to target + * * @param database name of the MongoDB database */ public void setDatabase(String database) { @@ -258,7 +329,9 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Create collection during initialisation if it doesn't exist. Default is true. + * Create collection during initialisation if it doesn't exist. Default is + * true. + * * @param createCollection true or false */ public void setCreateCollection(boolean createCollection) { @@ -276,9 +349,10 @@ public class MongoDbEndpoint extends DefaultEndpoint { public DBCollection getDbCollection() { return dbCollection; } - + /** * Sets the Mongo instance that represents the backing connection + * * @param mongoConnection the connection to the database */ public void setMongoConnection(Mongo mongoConnection) { @@ -290,10 +364,14 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones. - * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method. + * Set the {@link WriteConcern} for write operations on MongoDB using the + * standard ones. Resolved from the fields of the WriteConcern class by + * calling the {@link WriteConcern#valueOf(String)} method. + * * @param writeConcern the standard name of the WriteConcern - * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible options</a> + * @see <a + * href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html#valueOf(java.lang.String)">possible + * options</a> */ public void setWriteConcern(String writeConcern) { this.writeConcern = WriteConcern.valueOf(writeConcern); @@ -304,9 +382,11 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with every operation. By default, MongoDB does not wait - * for the write operation to occur before returning. If set to true, each exchange will only return after the write operation - * has actually occurred in MongoDB. + * Instructs this endpoint to invoke {@link WriteResult#getLastError()} with + * every operation. By default, MongoDB does not wait for the write + * operation to occur before returning. If set to true, each exchange will + * only return after the write operation has actually occurred in MongoDB. + * * @param invokeGetLastError true or false */ public void setInvokeGetLastError(boolean invokeGetLastError) { @@ -320,16 +400,17 @@ public class MongoDbEndpoint extends DefaultEndpoint { /** * Set the {@link WriteConcern} for write operations on MongoDB, passing in the bean ref to a custom WriteConcern which exists in the Registry. * You can also use standard WriteConcerns by passing in their key. See the {@link #setWriteConcern(String) setWriteConcern} method. + * * @param writeConcernRef the name of the bean in the registry that represents the WriteConcern to use */ public void setWriteConcernRef(String writeConcernRef) { WriteConcern wc = this.getCamelContext().getRegistry().lookupByNameAndType(writeConcernRef, WriteConcern.class); if (wc == null) { - LOG.error("Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the " - + "provided bean name ({}) is correct. Aborting initialization.", writeConcernRef); - throw new IllegalArgumentException("Camel MongoDB component could not find the WriteConcern in the Registry"); + String msg = "Camel MongoDB component could not find the WriteConcern in the Registry. Verify that the " + + "provided bean name (" + writeConcernRef + ") is correct. Aborting initialization."; + throw new IllegalArgumentException(msg); } - + this.writeConcernRef = wc; } @@ -337,9 +418,11 @@ public class MongoDbEndpoint extends DefaultEndpoint { return writeConcernRef; } - /** - * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read preferences set directly on the connection will be - * overridden by this setting. + /** + * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read + * preferences set directly on the connection will be overridden by this + * setting. + * * @param readPreference the bean name of the read preference to set */ public void setReadPreference(String readPreference) { @@ -354,23 +437,28 @@ public class MongoDbEndpoint extends DefaultEndpoint { break; } } - - LOG.error("Could not resolve specified ReadPreference of type {}. Read preferences are resolved from inner " - + "classes of com.mongodb.ReadPreference.", readPreference); - throw new IllegalArgumentException("MongoDB endpoint could not resolve specified ReadPreference"); + + String msg = "Could not resolve specified ReadPreference of type " + readPreference + + ". Read preferences are resolved from inner classes of com.mongodb.ReadPreference."; + throw new IllegalArgumentException(msg); } - + public ReadPreference getReadPreference() { return readPreference; } /** - * Sets whether this endpoint will attempt to dynamically resolve the target database and collection from the incoming Exchange properties. - * Can be used to override at runtime the database and collection specified on the otherwise static endpoint URI. - * It is disabled by default to boost performance. Enabling it will take a minimal performance hit. + * Sets whether this endpoint will attempt to dynamically resolve the target + * database and collection from the incoming Exchange properties. Can be + * used to override at runtime the database and collection specified on the + * otherwise static endpoint URI. It is disabled by default to boost + * performance. Enabling it will take a minimal performance hit. + * + * @param dynamicity true or false indicated whether target database and + * collection should be calculated dynamically based on Exchange + * properties. * @see MongoDbConstants#DATABASE * @see MongoDbConstants#COLLECTION - * @param dynamicity true or false indicated whether target database and collection should be calculated dynamically based on Exchange properties. */ public void setDynamicity(boolean dynamicity) { this.dynamicity = dynamicity; @@ -381,7 +469,8 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Reserved for future use, when more consumer types are supported. + * Reserved for future use, when more consumer types are supported. + * * @param consumerType key of the consumer type * @throws CamelMongoDbException */ @@ -396,15 +485,17 @@ public class MongoDbEndpoint extends DefaultEndpoint { public MongoDbConsumerType getConsumerType() { return consumerType; } - + public String getTailTrackDb() { return tailTrackDb; } /** - * Indicates what database the tail tracking mechanism will persist to. If not specified, the current database will - * be picked by default. Dynamicity will not be taken into account even if enabled, i.e. the tail tracking database - * will not vary past endpoint initialisation. + * Indicates what database the tail tracking mechanism will persist to. If + * not specified, the current database will be picked by default. Dynamicity + * will not be taken into account even if enabled, i.e. the tail tracking + * database will not vary past endpoint initialisation. + * * @param tailTrackDb database name */ public void setTailTrackDb(String tailTrackDb) { @@ -416,8 +507,10 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Collection where tail tracking information will be persisted. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} - * will be used by default. + * Collection where tail tracking information will be persisted. If not + * specified, {@link MongoDbTailTrackingConfig#DEFAULT_COLLECTION} will be + * used by default. + * * @param tailTrackCollection collection name */ public void setTailTrackCollection(String tailTrackCollection) { @@ -429,8 +522,9 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Field where the last tracked value will be placed. If not specified, {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} - * will be used by default. + * Field where the last tracked value will be placed. If not specified, + * {@link MongoDbTailTrackingConfig#DEFAULT_FIELD} will be used by default. + * * @param tailTrackField field name */ public void setTailTrackField(String tailTrackField) { @@ -438,8 +532,11 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * Enable persistent tail tracking, which is a mechanism to keep track of the last consumed message across system restarts. - * The next time the system is up, the endpoint will recover the cursor from the point where it last stopped slurping records. + * Enable persistent tail tracking, which is a mechanism to keep track of + * the last consumed message across system restarts. The next time the + * system is up, the endpoint will recover the cursor from the point where + * it last stopped slurping records. + * * @param persistentTailTracking true or false */ public void setPersistentTailTracking(boolean persistentTailTracking) { @@ -449,14 +546,16 @@ public class MongoDbEndpoint extends DefaultEndpoint { public boolean isPersistentTailTracking() { return persistentTailTracking; } - + /** - * Correlation field in the incoming record which is of increasing nature and will be used to position the tailing cursor every - * time it is generated. - * The cursor will be (re)created with a query of type: tailTrackIncreasingField > lastValue (possibly recovered from persistent - * tail tracking). - * Can be of type Integer, Date, String, etc. - * NOTE: No support for dot notation at the current time, so the field should be at the top level of the document. + * Correlation field in the incoming record which is of increasing nature + * and will be used to position the tailing cursor every time it is + * generated. The cursor will be (re)created with a query of type: + * tailTrackIncreasingField > lastValue (possibly recovered from persistent + * tail tracking). Can be of type Integer, Date, String, etc. NOTE: No + * support for dot notation at the current time, so the field should be at + * the top level of the document. + * * @param tailTrackIncreasingField */ public void setTailTrackIncreasingField(String tailTrackIncreasingField) { @@ -469,16 +568,20 @@ public class MongoDbEndpoint extends DefaultEndpoint { public MongoDbTailTrackingConfig getTailTrackingConfig() { if (tailTrackingConfig == null) { - tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, - tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, tailTrackField, getPersistentId()); + tailTrackingConfig = new MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, tailTrackDb == null ? database : tailTrackDb, tailTrackCollection, + tailTrackField, getPersistentId()); } - return tailTrackingConfig; + return tailTrackingConfig; } /** - * MongoDB tailable cursors will block until new data arrives. If no new data is inserted, after some time the cursor will be automatically - * freed and closed by the MongoDB server. The client is expected to regenerate the cursor if needed. This value specifies the time to wait - * before attempting to fetch a new cursor, and if the attempt fails, how long before the next attempt is made. Default value is 1000ms. + * MongoDB tailable cursors will block until new data arrives. If no new + * data is inserted, after some time the cursor will be automatically freed + * and closed by the MongoDB server. The client is expected to regenerate + * the cursor if needed. This value specifies the time to wait before + * attempting to fetch a new cursor, and if the attempt fails, how long + * before the next attempt is made. Default value is 1000ms. + * * @param cursorRegenerationDelay delay specified in milliseconds */ public void setCursorRegenerationDelay(long cursorRegenerationDelay) { @@ -490,9 +593,12 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * One tail tracking collection can host many trackers for several tailable consumers. - * To keep them separate, each tracker should have its own unique persistentId. - * @param persistentId the value of the persistent ID to use for this tailable consumer + * One tail tracking collection can host many trackers for several tailable + * consumers. To keep them separate, each tracker should have its own unique + * persistentId. + * + * @param persistentId the value of the persistent ID to use for this + * tailable consumer */ public void setPersistentId(String persistentId) { this.persistentId = persistentId; @@ -507,8 +613,10 @@ public class MongoDbEndpoint extends DefaultEndpoint { } /** - * In write operations, it determines whether instead of returning {@link WriteResult} as the body of the OUT - * message, we transfer the IN message to the OUT and attach the WriteResult as a header. + * In write operations, it determines whether instead of returning + * {@link WriteResult} as the body of the OUT message, we transfer the IN + * message to the OUT and attach the WriteResult as a header. + * * @param writeResultAsHeader flag to indicate if this option is enabled */ public void setWriteResultAsHeader(boolean writeResultAsHeader) { http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java index 1bf5a45..c18e696 100644 --- a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java +++ b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java @@ -67,17 +67,19 @@ public class MongoDbProducer extends DefaultProducer { throw new CamelMongoDbException("Operation specified on header is not supported. Value: " + header, e); } } - + try { invokeOperation(operation, exchange); } catch (Exception e) { throw MongoDbComponent.wrapInCamelMongoDbException(e); } - + } /** - * Entry method that selects the appropriate MongoDB operation and executes it + * Entry method that selects the appropriate MongoDB operation and executes + * it + * * @param operation * @param exchange * @throws Exception @@ -87,7 +89,7 @@ public class MongoDbProducer extends DefaultProducer { case count: doCount(exchange); break; - + case findOneByQuery: doFindOneByQuery(exchange); break; @@ -134,14 +136,15 @@ public class MongoDbProducer extends DefaultProducer { } // ----------- MongoDB operations ---------------- - + protected void doGetStats(Exchange exchange, MongoDbOperation operation) throws Exception { DBObject result = null; - + if (operation == MongoDbOperation.getColStats) { result = calculateCollection(exchange).getStats(); } else if (operation == MongoDbOperation.getDbStats) { - // if it's a DB, also take into account the dynamicity option and the DB that is used + // if it's a DB, also take into account the dynamicity option and + // the DB that is used result = calculateCollection(exchange).getDB().getStats(); } else { throw new CamelMongoDbException("Internal error: wrong operation for getStats variant" + operation); @@ -154,13 +157,15 @@ public class MongoDbProducer extends DefaultProducer { protected void doRemove(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); DBObject removeObj = exchange.getIn().getMandatoryBody(DBObject.class); - + WriteConcern wc = extractWriteConcern(exchange); WriteResult result = wc == null ? dbCol.remove(removeObj) : dbCol.remove(removeObj, wc); - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.remove); - // we always return the WriteResult, because whether the getLastError was called or not, - // the user will have the means to call it or obtain the cached CommandResult + // we always return the WriteResult, because whether the getLastError + // was called or not, + // the user will have the means to call it or obtain the cached + // CommandResult processAndTransferWriteResult(result, exchange); resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN()); } @@ -168,96 +173,106 @@ public class MongoDbProducer extends DefaultProducer { @SuppressWarnings("unchecked") protected void doUpdate(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); - List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>) (Class<?>) List.class); + List<DBObject> saveObj = exchange.getIn().getMandatoryBody((Class<List<DBObject>>)(Class<?>)List.class); if (saveObj.size() != 2) { throw new CamelMongoDbException("MongoDB operation = insert, failed because body is not a List of DBObject objects with size = 2"); } - + DBObject updateCriteria = saveObj.get(0); DBObject objNew = saveObj.get(1); - + Boolean multi = exchange.getIn().getHeader(MongoDbConstants.MULTIUPDATE, Boolean.class); Boolean upsert = exchange.getIn().getHeader(MongoDbConstants.UPSERT, Boolean.class); - + WriteResult result; WriteConcern wc = extractWriteConcern(exchange); - // In API 2.7, the default upsert and multi values of update(DBObject, DBObject) are false, false, so we unconditionally invoke the - // full-signature method update(DBObject, DBObject, boolean, boolean). However, the default behaviour may change in the future, + // In API 2.7, the default upsert and multi values of update(DBObject, + // DBObject) are false, false, so we unconditionally invoke the + // full-signature method update(DBObject, DBObject, boolean, boolean). + // However, the default behaviour may change in the future, // so it's safer to be explicit at this level for full determinism if (multi == null && upsert == null) { - // for update with no multi nor upsert but with specific WriteConcern there is no update signature without multi and upsert args, + // for update with no multi nor upsert but with specific + // WriteConcern there is no update signature without multi and + // upsert args, // so assume defaults result = wc == null ? dbCol.update(updateCriteria, objNew) : dbCol.update(updateCriteria, objNew, false, false, wc); } else { - // we calculate the final boolean values so that if any of these parameters is null, it is resolved to false - result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) - : dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc); + // we calculate the final boolean values so that if any of these + // parameters is null, it is resolved to false + result = wc == null ? dbCol.update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi)) : dbCol + .update(updateCriteria, objNew, calculateBooleanValue(upsert), calculateBooleanValue(multi), wc); } - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.update); - // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or + // we always return the WriteResult, because whether the getLastError + // was called or not, the user will have the means to call it or // obtain the cached CommandResult processAndTransferWriteResult(result, exchange); resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN()); } - + protected void doSave(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); DBObject saveObj = exchange.getIn().getMandatoryBody(DBObject.class); - + WriteConcern wc = extractWriteConcern(exchange); WriteResult result = wc == null ? dbCol.save(saveObj) : dbCol.save(saveObj, wc); - + prepareResponseMessage(exchange, MongoDbOperation.save); - // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or + // we always return the WriteResult, because whether the getLastError + // was called or not, the user will have the means to call it or // obtain the cached CommandResult processAndTransferWriteResult(result, exchange); } - + protected void doFindById(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); Object o = exchange.getIn().getMandatoryBody(); DBObject ret; - + DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class); if (fieldFilter == null) { ret = dbCol.findOne(o); } else { ret = dbCol.findOne(o, fieldFilter); } - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.save); resultMessage.setBody(ret); resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) protected void doInsert(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); boolean singleInsert = true; Object insert = exchange.getIn().getBody(DBObject.class); - // body could not be converted to DBObject, check to see if it's of type List<DBObject> + // body could not be converted to DBObject, check to see if it's of type + // List<DBObject> if (insert == null) { insert = exchange.getIn().getBody(List.class); - // if the body of type List was obtained, ensure that all items are of type DBObject and cast the List to List<DBObject> + // if the body of type List was obtained, ensure that all items are + // of type DBObject and cast the List to List<DBObject> if (insert != null) { singleInsert = false; - insert = attemptConvertToList((List) insert, exchange); + insert = attemptConvertToList((List)insert, exchange); } else { throw new CamelMongoDbException("MongoDB operation = insert, Body is not conversible to type DBObject nor List<DBObject>"); } } - + WriteResult result; WriteConcern wc = extractWriteConcern(exchange); if (singleInsert) { - result = wc == null ? dbCol.insert((DBObject) insert) : dbCol.insert((DBObject) insert, wc); + result = wc == null ? dbCol.insert((DBObject)insert) : dbCol.insert((DBObject)insert, wc); } else { - result = wc == null ? dbCol.insert((List<DBObject>) insert) : dbCol.insert((List<DBObject>) insert, wc); + result = wc == null ? dbCol.insert((List<DBObject>)insert) : dbCol.insert((List<DBObject>)insert, wc); } - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.insert); - // we always return the WriteResult, because whether the getLastError was called or not, the user will have the means to call it or + // we always return the WriteResult, because whether the getLastError + // was called or not, the user will have the means to call it or // obtain the cached CommandResult processAndTransferWriteResult(result, exchange); resultMessage.setBody(result); @@ -265,21 +280,23 @@ public class MongoDbProducer extends DefaultProducer { protected void doFindAll(Exchange exchange) throws Exception { DBCollection dbCol = calculateCollection(exchange); - // do not use getMandatoryBody, because if the body is empty we want to retrieve all objects in the collection + // do not use getMandatoryBody, because if the body is empty we want to + // retrieve all objects in the collection DBObject query = null; - // do not run around looking for a type converter unless there is a need for it + // do not run around looking for a type converter unless there is a need + // for it if (exchange.getIn().getBody() != null) { query = exchange.getIn().getBody(DBObject.class); } DBObject fieldFilter = exchange.getIn().getHeader(MongoDbConstants.FIELDS_FILTER, DBObject.class); - + // get the batch size and number to skip Integer batchSize = exchange.getIn().getHeader(MongoDbConstants.BATCH_SIZE, Integer.class); Integer numToSkip = exchange.getIn().getHeader(MongoDbConstants.NUM_TO_SKIP, Integer.class); Integer limit = exchange.getIn().getHeader(MongoDbConstants.LIMIT, Integer.class); DBObject sortBy = exchange.getIn().getHeader(MongoDbConstants.SORT_BY, DBObject.class); DBCursor ret = null; - try { + try { if (query == null && fieldFilter == null) { ret = dbCol.find(new BasicDBObject()); } else if (fieldFilter == null) { @@ -287,28 +304,28 @@ public class MongoDbProducer extends DefaultProducer { } else { ret = dbCol.find(query, fieldFilter); } - + if (sortBy != null) { ret.sort(sortBy); } - + if (batchSize != null) { ret.batchSize(batchSize.intValue()); } - + if (numToSkip != null) { ret.skip(numToSkip.intValue()); } - + if (limit != null) { ret.limit(limit.intValue()); } - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findAll); resultMessage.setBody(ret.toArray()); resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count()); resultMessage.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size()); - + } catch (Exception e) { // rethrow the exception throw e; @@ -318,7 +335,7 @@ public class MongoDbProducer extends DefaultProducer { ret.close(); } } - + } protected void doFindOneByQuery(Exchange exchange) throws Exception { @@ -332,7 +349,7 @@ public class MongoDbProducer extends DefaultProducer { } else { ret = dbCol.findOne(o, fieldFilter); } - + Message resultMessage = prepareResponseMessage(exchange, MongoDbOperation.findOneByQuery); resultMessage.setBody(ret); resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1); @@ -405,7 +422,9 @@ public class MongoDbProducer extends DefaultProducer { } dbCol = dynamicCollection == null ? db.getCollection(endpoint.getCollection()) : db.getCollection(dynamicCollection); - LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Dynamic database and/or collection selected: {}->{}", dbCol.getDB().getName(), dbCol.getName()); + } return dbCol; } @@ -423,18 +442,19 @@ public class MongoDbProducer extends DefaultProducer { exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException())); } } - - // determine where to set the WriteResult: as the OUT body or as an IN message header + + // determine where to set the WriteResult: as the OUT body or as an IN + // message header if (endpoint.isWriteResultAsHeader()) { exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result); } else { exchange.getOut().setBody(result); } } - + private WriteConcern extractWriteConcern(Exchange exchange) throws CamelMongoDbException { Object o = exchange.getIn().getHeader(MongoDbConstants.WRITECONCERN); - + if (o == null) { return null; } else if (o instanceof WriteConcern) { @@ -442,16 +462,16 @@ public class MongoDbProducer extends DefaultProducer { } else if (o instanceof String) { WriteConcern answer = WriteConcern.valueOf(ObjectHelper.cast(String.class, o)); if (answer == null) { - throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN - + " header, with value " + o + " could not be resolved to a WriteConcern type"); + throw new CamelMongoDbException("WriteConcern specified in the " + MongoDbConstants.WRITECONCERN + " header, with value " + o + + " could not be resolved to a WriteConcern type"); } } - + // should never get here LOG.warn("A problem occurred while resolving the Exchange's Write Concern"); return null; } - + @SuppressWarnings("rawtypes") private List<DBObject> attemptConvertToList(List insertList, Exchange exchange) throws CamelMongoDbException { List<DBObject> dbObjectList = new ArrayList<DBObject>(insertList.size()); @@ -466,7 +486,7 @@ public class MongoDbProducer extends DefaultProducer { } return dbObjectList; } - + private Message prepareResponseMessage(Exchange exchange, MongoDbOperation operation) { Message answer = exchange.getOut(); MessageHelper.copyHeaders(exchange.getIn(), answer, false); @@ -475,9 +495,9 @@ public class MongoDbProducer extends DefaultProducer { } return answer; } - + private boolean isWriteOperation(MongoDbOperation operation) { return MongoDbComponent.WRITE_OPERATIONS.contains(operation); } - + } http://git-wip-us.apache.org/repos/asf/camel/blob/d5f42992/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java ---------------------------------------------------------------------- diff --git a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java new file mode 100644 index 0000000..25468f5 --- /dev/null +++ b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbIndexTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.mongodb; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.WriteResult; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class MongoDbIndexTest extends AbstractMongoDbTest { + + @Test + public void testInsertDynamicityEnabledDBAndCollectionAndIndex() { + assertEquals(0, testCollection.count()); + mongo.getDB("otherDB").dropDatabase(); + db.getCollection("otherCollection").drop(); + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + + String body = "{\"_id\": \"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\", \"b\" : \"2\"}"; + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(MongoDbConstants.DATABASE, "otherDB"); + headers.put(MongoDbConstants.COLLECTION, "otherCollection"); + + List<DBObject> objIndex = new ArrayList<DBObject>(); + DBObject index1 = new BasicDBObject(); + index1.put("a", 1); + DBObject index2 = new BasicDBObject(); + index2.put("b", -1); + objIndex.add(index1); + objIndex.add(index2); + headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex); + + Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers); + + assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass()); + + DBCollection dynamicCollection = mongo.getDB("otherDB").getCollection("otherCollection"); + + List<DBObject> indexInfos = dynamicCollection.getIndexInfo(); + + BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key"); + BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key"); + + assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a"))); + assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b"))); + + DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledDBAndCollection"); + assertNotNull("No record with 'testInsertDynamicityEnabledDBAndCollection' _id", b); + + b = testCollection.findOne("testInsertDynamicityEnabledDBOnly"); + assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b); + + assertTrue("The otherDB database should exist", mongo.getDatabaseNames().contains("otherDB")); + } + + @Test + public void testInsertDynamicityEnabledCollectionAndIndex() { + assertEquals(0, testCollection.count()); + mongo.getDB("otherDB").dropDatabase(); + db.getCollection("otherCollection").drop(); + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + + String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionAndIndex\", \"a\" : \"1\", \"b\" : \"2\"}"; + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(MongoDbConstants.COLLECTION, "otherCollection"); + + List<DBObject> objIndex = new ArrayList<DBObject>(); + DBObject index1 = new BasicDBObject(); + index1.put("a", 1); + DBObject index2 = new BasicDBObject(); + index2.put("b", -1); + objIndex.add(index1); + objIndex.add(index2); + headers.put(MongoDbConstants.COLLECTION_INDEX, objIndex); + + Object result = template.requestBodyAndHeaders("direct:dynamicityEnabled", body, headers); + + assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass()); + + DBCollection dynamicCollection = db.getCollection("otherCollection"); + + List<DBObject> indexInfos = dynamicCollection.getIndexInfo(); + + BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key"); + BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key"); + + assertTrue("No index on the field a", key1.containsField("a") && "1".equals(key1.getString("a"))); + assertTrue("No index on the field b", key2.containsField("b") && "-1".equals(key2.getString("b"))); + + DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionAndIndex"); + assertNotNull("No record with 'testInsertDynamicityEnabledCollectionAndIndex' _id", b); + + b = testCollection.findOne("testInsertDynamicityEnabledDBOnly"); + assertNull("There is a record with 'testInsertDynamicityEnabledDBAndCollection' _id in the test collection", b); + + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + } + + @Test + public void testInsertDynamicityEnabledCollectionOnlyAndURIIndex() { + assertEquals(0, testCollection.count()); + mongo.getDB("otherDB").dropDatabase(); + db.getCollection("otherCollection").drop(); + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + + String body = "{\"_id\": \"testInsertDynamicityEnabledCollectionOnlyAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}"; + Map<String, Object> headers = new HashMap<String, Object>(); + headers.put(MongoDbConstants.COLLECTION, "otherCollection"); + + Object result = template.requestBodyAndHeaders("direct:dynamicityEnabledWithIndexUri", body, headers); + + assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass()); + + DBCollection dynamicCollection = db.getCollection("otherCollection"); + List<DBObject> indexInfos = dynamicCollection.getIndexInfo(); + + BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key"); + + assertFalse("No index on the field a", key1.containsField("a") && "-1".equals(key1.getString("a"))); + + DBObject b = dynamicCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex"); + assertNotNull("No record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id", b); + + b = testCollection.findOne("testInsertDynamicityEnabledCollectionOnlyAndURIIndex"); + assertNull("There is a record with 'testInsertDynamicityEnabledCollectionOnlyAndURIIndex' _id in the test collection", b); + + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + } + + @Test + public void testInsertAutoCreateCollectionAndURIIndex() { + assertEquals(0, testCollection.count()); + db.getCollection("otherCollection").remove(new BasicDBObject()); + + String body = "{\"_id\": \"testInsertAutoCreateCollectionAndURIIndex\", \"a\" : \"1\", \"b\" : \"2\"}"; + Map<String, Object> headers = new HashMap<String, Object>(); + + Object result = template.requestBodyAndHeaders("direct:dynamicityDisabled", body, headers); + assertEquals("Response isn't of type WriteResult", WriteResult.class, result.getClass()); + + DBCollection collection = db.getCollection("otherCollection"); + List<DBObject> indexInfos = collection.getIndexInfo(); + + BasicDBObject key1 = (BasicDBObject) indexInfos.get(1).get("key"); + BasicDBObject key2 = (BasicDBObject) indexInfos.get(2).get("key"); + + assertTrue("No index on the field b", key1.containsField("b") && "-1".equals(key1.getString("b"))); + assertTrue("No index on the field a", key2.containsField("a") && "1".equals(key2.getString("a"))); + + DBObject b = collection.findOne("testInsertAutoCreateCollectionAndURIIndex"); + assertNotNull("No record with 'testInsertAutoCreateCollectionAndURIIndex' _id", b); + + b = testCollection.findOne("testInsertAutoCreateCollectionAndURIIndex"); + assertNull("There is a record with 'testInsertAutoCreateCollectionAndURIIndex' _id in the test collection", b); + + assertFalse("The otherDB database should not exist", mongo.getDatabaseNames().contains("otherDB")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:dynamicityEnabled") + .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE"); + from("direct:dynamicityEnabledWithIndexUri") + .to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&collectionIndex={\"a\":\"1\"}&operation=insert&dynamicity=true&writeConcern=SAFE"); + from("direct:dynamicityDisabled") + .to("mongodb:myDb?database={{mongodb.testDb}}&collection=otherCollection&collectionIndex={\"a\":\"1\",\"b\":\"-1\"}&operation=insert&dynamicity=false&writeConcern=SAFE"); + } + }; + } +}