Repository: camel
Updated Branches:
  refs/heads/master b20726641 -> 7a1bf6922


Added support for oplog tracking. At the same time added interface for 
extending the strategy of extraction of the tailTrackingIncreasingField and 
creation of the cursor used to track the tail or a collection.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cf2a11f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cf2a11f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cf2a11f

Branch: refs/heads/master
Commit: 3cf2a11fcae1f8502d425780039fa3eda9d23663
Parents: b207266
Author: gilfernandes <gil.fernan...@gmail.com>
Authored: Fri Jan 6 19:18:28 2017 +0000
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Sat Jan 7 10:52:28 2017 +0100

----------------------------------------------------------------------
 components/camel-mongodb/pom.xml                |   6 +
 .../src/main/docs/mongodb-component.adoc        | 122 ++++++++++++++++++-
 .../mongodb/MongoDBTailTrackingEnum.java        |  52 ++++++++
 .../mongodb/MongoDBTailTrackingStrategy.java    |  42 +++++++
 .../component/mongodb/MongoDbEndpoint.java      |  43 ++++++-
 .../mongodb/MongoDbTailTrackingConfig.java      |  34 +++---
 .../mongodb/MongoDbTailTrackingManager.java     |  31 +++--
 .../mongodb/MongoDbTailingProcess.java          |   9 +-
 .../MongoDBTailTrackingStrategyTest.java        |  75 ++++++++++++
 9 files changed, 376 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/pom.xml b/components/camel-mongodb/pom.xml
index 8f04d97..81f3045 100644
--- a/components/camel-mongodb/pom.xml
+++ b/components/camel-mongodb/pom.xml
@@ -83,6 +83,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>de.flapdoodle.embed</groupId>
       <artifactId>de.flapdoodle.embed.mongo</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/docs/mongodb-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-mongodb/src/main/docs/mongodb-component.adoc 
b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
index ca4dd5a..b63b4e7 100644
--- a/components/camel-mongodb/src/main/docs/mongodb-component.adoc
+++ b/components/camel-mongodb/src/main/docs/mongodb-component.adoc
@@ -66,7 +66,7 @@ The MongoDB component has no options.
 
 
 // endpoint options: START
-The MongoDB component supports 22 endpoint options which are listed below:
+The MongoDB component supports 24 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -85,15 +85,17 @@ The MongoDB component supports 22 endpoint options which 
are listed below:
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the 
exchange pattern when the consumer creates an exchange.
 | cursorRegenerationDelay | advanced | 1000 | long | MongoDB tailable cursors 
will block until new data arrives. If no new data is inserted after some time 
the cursor will be automatically freed and closed by the MongoDB server. The 
client is expected to regenerate the cursor if needed. This value specifies the 
time to wait before attempting to fetch a new cursor and if the attempt fails 
how long before the next attempt is made. Default value is 1000ms.
 | dynamicity | advanced | false | boolean | Sets whether this endpoint will 
attempt to dynamically resolve the target database and collection from the 
incoming Exchange properties. Can be used to override at runtime the database 
and collection specified on the otherwise static endpoint URI. It is disabled 
by default to boost performance. Enabling it will take a minimal performance 
hit.
-| readPreference | advanced |  | ReadPreference | Sets a MongoDB 
ReadPreference on the Mongo connection. Read preferences set directly on the 
connection will be overridden by this setting. The link 
com.mongodb.ReadPreferencevalueOf(String) utility method is used to resolve the 
passed readPreference value. Some examples for the possible values are nearest 
primary or secondary etc.
+| readPreference | advanced |  | ReadPreference | Sets a MongoDB 
ReadPreference on the Mongo connection. Read preferences set directly on the 
connection will be overridden by this setting. The link 
ReadPreferencevalueOf(String) utility method is used to resolve the passed 
readPreference value. Some examples for the possible values are nearest primary 
or secondary etc.
 | synchronous | advanced | false | boolean | Sets whether synchronous 
processing should be strictly used or Camel is allowed to use asynchronous 
processing (if supported).
 | writeResultAsHeader | advanced | false | boolean | In write operations it 
determines whether instead of returning WriteResult as the body of the OUT 
message we transfer the IN message to the OUT and attach the WriteResult as a 
header.
 | persistentId | tail |  | String | One tail tracking collection can host many 
trackers for several tailable consumers. To keep them separate each tracker 
should have its own unique persistentId.
 | persistentTailTracking | tail | false | boolean | Enable persistent tail 
tracking which is a mechanism to keep track of the last consumed message across 
system restarts. The next time the system is up the endpoint will recover the 
cursor from the point where it last stopped slurping records.
+| persistRecords | tail | -1 | int | Sets the number of tailed records after 
which the tail tracking data is persisted to MongoDB.
 | tailTrackCollection | tail |  | String | Collection where tail tracking 
information will be persisted. If not specified link 
MongoDbTailTrackingConfigDEFAULT_COLLECTION will be used by default.
 | tailTrackDb | tail |  | String | Indicates what database the tail tracking 
mechanism will persist to. If not specified the current database will be picked 
by default. Dynamicity will not be taken into account even if enabled i.e. the 
tail tracking database will not vary past endpoint initialisation.
 | tailTrackField | tail |  | String | Field where the last tracked value will 
be placed. If not specified link MongoDbTailTrackingConfigDEFAULT_FIELD will be 
used by default.
 | tailTrackIncreasingField | tail |  | String | Correlation field in the 
incoming record which is of increasing nature and will be used to position the 
tailing cursor every time it is generated. The cursor will be (re)created with 
a query of type: tailTrackIncreasingField lastValue (possibly recovered from 
persistent tail tracking). Can be of type Integer Date String etc. NOTE: No 
support for dot notation at the current time so the field should be at the top 
level of the document.
+| tailTrackingStrategy | tail | LITERAL | MongoDBTailTrackingEnum | Sets the 
strategy used to extract the increasing field value and to create the query to 
position the tail cursor.
 |=======================================================================
 {% endraw %}
 // endpoint options: END
@@ -812,6 +814,122 @@ 
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasing
     .to("mock:test");
 
-----------------------------------------------------------------------------------------------------------------------------------
 
+[[MongoDB-TailtrackingOnOplog]]
+Oplog Tail Tracking
+^^^^^^^^^^^^^^^^^^^
+
+The *oplog* collection tracking feature allows to implement trigger like 
functionality in MongoDB.
+In order to activate this collection you will have first to activate a replica 
set. For more
+information on this topic please check 
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/ .
+
+Below you can find an example of a Java DSL based route demonstrating how you 
can use the component to track the *oplog*
+collection. In this specific case we are filtering the events which affect a 
collection *customers* in
+database *optlog_test*. Note that the `tailTrackIncreasingField` is a 
timestamp field ('ts') which implies
+that you have to use the `tailTrackingStrategy` parameter with the *TIMESTAMP* 
value.
+
+[source,java]
+-----------------------------------------------------------------------------------------------------------------------------------
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mongodb.MongoDBTailTrackingEnum;
+import org.apache.camel.main.Main;
+
+import java.io.InputStream;
+
+/**
+ * For this to work you need to turn on the replica set
+ * <p>
+ * Commands to create a replica set:
+ * <p>
+ * rs.initiate( {
+ * _id : "rs0",
+ * members: [ { _id : 0, host : "localhost:27017" } ]
+ * })
+ */
+public class MongoDbTracker {
+
+    private final String database;
+
+    private final String collection;
+
+    private final String increasingField;
+
+    private MongoDBTailTrackingEnum trackingStrategy;
+
+    private int persistRecords = -1;
+
+    private boolean persistenTailTracking;
+
+    public MongoDbTracker(String database, String collection, String 
increasingField) {
+        this.database = database;
+        this.collection = collection;
+        this.increasingField = increasingField;
+    }
+
+    public static void main(String[] args) throws Exception {
+        final MongoDbTracker mongoDbTracker = new MongoDbTracker("local", 
"oplog.rs", "ts");
+        mongoDbTracker.setTrackingStrategy(MongoDBTailTrackingEnum.TIMESTAMP);
+        mongoDbTracker.setPersistRecords(5);
+        mongoDbTracker.setPersistenTailTracking(true);
+        mongoDbTracker.startRouter();
+        // run until you terminate the JVM
+        System.out.println("Starting Camel. Use ctrl + c to terminate the 
JVM.\n");
+
+    }
+
+    public void setTrackingStrategy(MongoDBTailTrackingEnum trackingStrategy) {
+        this.trackingStrategy = trackingStrategy;
+    }
+
+    public void setPersistRecords(int persistRecords) {
+        this.persistRecords = persistRecords;
+    }
+
+    public void setPersistenTailTracking(boolean persistenTailTracking) {
+        this.persistenTailTracking = persistenTailTracking;
+    }
+
+    void startRouter() throws Exception {
+        // create a Main instance
+        Main main = new Main();
+        main.bind(MongoConstants.CONN_NAME, new MongoClient("localhost", 
27017));
+        main.addRouteBuilder(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
getContext().getTypeConverterRegistry().addTypeConverter(InputStream.class, 
BasicDBObject.class,
+                        new MongoToInputStreamConverter());
+                from("mongodb://" + MongoConstants.CONN_NAME + "?database=" + 
database
+                        + "&collection=" + collection
+                        + "&persistentTailTracking=" + persistenTailTracking
+                        + "&persistentId=trackerName" + "&tailTrackDb=local"
+                        + "&tailTrackCollection=talendTailTracking"
+                        + "&tailTrackField=lastTrackingValue"
+                        + "&tailTrackIncreasingField=" + increasingField
+                        + "&tailTrackingStrategy=" + 
trackingStrategy.toString()
+                        + "&persistRecords=" + persistRecords
+                        + "&cursorRegenerationDelay=1000")
+                        
.filter().jsonpath("$[?(@.ns=='optlog_test.customers')]")
+                        .id("logger")
+                        .to("log:logger?level=WARN")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                Message message = exchange.getIn();
+                                
System.out.println(message.getBody().toString());
+                                
exchange.getOut().setBody(message.getBody().toString());
+                            }
+                        });
+            }
+        });
+        main.run();
+    }
+}
+-----------------------------------------------------------------------------------------------------------------------------------
+
+
 [[MongoDB-Typeconversions]]
 Type conversions
 ~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
new file mode 100644
index 0000000..65d5797
--- /dev/null
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingEnum.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import org.bson.types.BSONTimestamp;
+
+/**
+ * Contains the concrete MongoDB tail tracking strategies.
+ */
+public enum MongoDBTailTrackingEnum implements MongoDBTailTrackingStrategy {
+
+    TIMESTAMP {
+
+        @Override
+        public Object extractLastVal(DBObject o, String increasingField) {
+            Object temp = o.get(increasingField);
+            return ((BSONTimestamp) temp).getTime();
+        }
+
+        @Override
+        public BasicDBObject createQuery(Object lastVal, String 
increasingField) {
+            return new BasicDBObject(increasingField, new BasicDBObject("$gt", 
new BSONTimestamp((Integer)lastVal, 1)));
+        }
+    }, LITERAL {
+
+        @Override
+        public Object extractLastVal(DBObject o, String increasingField) {
+            return o.get(increasingField);
+        }
+
+        @Override
+        public BasicDBObject createQuery(Object lastVal, String 
increasingField) {
+            return new BasicDBObject(increasingField, new BasicDBObject("$gt", 
lastVal));
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
new file mode 100644
index 0000000..18a634c
--- /dev/null
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+
+/**
+ * This class contains different methods for extracting and saving tail 
tracking information.
+ */
+public interface MongoDBTailTrackingStrategy {
+
+    /**
+     * Extracts the last tracking value using the field name or an expression.
+     * @param o The object retrieved by the trailing process.
+     * @param increasingField The field name or an expression used to extract 
the last value.
+     * @return an object representing the last tracking value in a MongoDB 
collection.
+     */
+    Object extractLastVal(DBObject o, String increasingField);
+
+    /**
+     * Creates an object to be used in a query using the last tracking value.
+     * @param lastVal The last tracking value.
+     * @param increasingField The field name or an expression used to extract 
the last value.
+     * @return the object to be used in a MongoDB query.
+     */
+    BasicDBObject createQuery(Object lastVal, String increasingField);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 4ce84c7..15cc261 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -58,7 +58,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDbEndpoint.class);
 
     private MongoClient mongoConnection;
-    
+
     @UriPath @Metadata(required = "true")
     private String connectionBean;
     @UriParam
@@ -103,6 +103,12 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     @UriParam
     private MongoDbOutputType outputType;
 
+    @UriParam(label = "tail", defaultValue = "LITERAL")
+    private MongoDBTailTrackingEnum tailTrackingStrategy;
+
+    @UriParam(label = "tail", defaultValue = "-1")
+    private int persistRecords;
+
     private MongoDatabase mongoDatabase;
     private MongoCollection<BasicDBObject> mongoCollection;
 
@@ -182,6 +188,9 @@ public class MongoDbEndpoint extends DefaultEndpoint {
                 if (persistentTailTracking && 
(ObjectHelper.isEmpty(persistentId))) {
                     throw new IllegalArgumentException("persistentId is 
compulsory for persistent tail tracking");
                 }
+                if (persistentTailTracking && 
(ObjectHelper.isEmpty(persistentId))) {
+                    throw new IllegalArgumentException("persistentId is 
compulsory for persistent tail tracking");
+                }
             }
 
         } else {
@@ -290,7 +299,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         setWriteReadOptionsOnConnection();
         super.doStart();
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -460,7 +469,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
      * Sets a MongoDB {@link ReadPreference} on the Mongo connection. Read 
preferences set directly on the connection will be
      * overridden by this setting.
      * <p/>
-     * The {@link com.mongodb.ReadPreference#valueOf(String)} utility method 
is used to resolve the passed {@code readPreference}
+     * The {@link ReadPreference#valueOf(String)} utility method is used to 
resolve the passed {@code readPreference}
      * value. Some examples for the possible values are {@code nearest}, 
{@code primary} or {@code secondary} etc.
      * 
      * @param readPreference the name of the read preference to set
@@ -586,7 +595,7 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoDbTailTrackingConfig getTailTrackingConfig() {
         if (tailTrackingConfig == null) {
             tailTrackingConfig = new 
MongoDbTailTrackingConfig(persistentTailTracking, tailTrackIncreasingField, 
tailTrackDb == null ? database : tailTrackDb, tailTrackCollection,
-                    tailTrackField, getPersistentId());
+                    tailTrackField, getPersistentId(), tailTrackingStrategy);
         }
         return tailTrackingConfig;
     }
@@ -655,4 +664,30 @@ public class MongoDbEndpoint extends DefaultEndpoint {
     public MongoCollection<BasicDBObject> getMongoCollection() {
         return mongoCollection;
     }
+
+    public MongoDBTailTrackingEnum getTailTrackingStrategy() {
+        return tailTrackingStrategy;
+    }
+
+    /**
+     * Sets the strategy used to extract the increasing field value and to 
create the query to position the
+     * tail cursor.
+     * @param tailTrackingStrategy The strategy used to extract the increasing 
field value and to create the query to position the
+     * tail cursor.
+     */
+    public void setTailTrackingStrategy(MongoDBTailTrackingEnum 
tailTrackingStrategy) {
+        this.tailTrackingStrategy = tailTrackingStrategy;
+    }
+
+    public int getPersistRecords() {
+        return persistRecords;
+    }
+
+    /**
+     * Sets the number of tailed records after which the tail tracking data is 
persisted to MongoDB.
+     * @param persistRecords The number of tailed records after which the tail 
tracking data is persisted to MongoDB.
+     */
+    public void setPersistRecords(int persistRecords) {
+        this.persistRecords = persistRecords;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
index 7b998f5..c446d4d 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingConfig.java
@@ -18,17 +18,9 @@ package org.apache.camel.component.mongodb;
 
 public class MongoDbTailTrackingConfig {
     
-    public static final String DEFAULT_COLLECTION = "camelTailTracking";
-    public static final String DEFAULT_FIELD = "lastTrackingValue";
-    
-    /**
-     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
-     */
-    public final String increasingField;
-    /**
-     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
-     */
-    public final boolean persistent;
+    static final String DEFAULT_COLLECTION = "camelTailTracking";
+    static final String DEFAULT_FIELD = "lastTrackingValue";
+
     /**
      * See {@link MongoDbEndpoint#setTailTrackDb(String)}
      */
@@ -38,21 +30,35 @@ public class MongoDbTailTrackingConfig {
      */
     public final String collection;
     /**
+     * See {@link MongoDbEndpoint#setTailTrackIncreasingField(String)}
+     */
+    final String increasingField;
+    /**
+     * See {@link MongoDbEndpoint#setPersistentTailTracking(boolean)}
+     */
+    final boolean persistent;
+    /**
      * See {@link MongoDbEndpoint#setTailTrackField(String)}
      */
-    public final String field;
+    final String field;
     /**
      * See {@link MongoDbEndpoint#setPersistentId(String)}
      */
-    public final String persistentId;
+    final String persistentId;
+
+    /**
+     * See {@link 
MongoDbEndpoint#setTailTrackingStrategy(MongoDBTailTrackingEnum)}
+     */
+    final MongoDBTailTrackingEnum mongoDBTailTrackingStrategy;
     
     public MongoDbTailTrackingConfig(boolean persistentTailTracking, String 
tailTrackIncreasingField, String tailTrackDb,
-            String tailTrackCollection, String tailTrackField, String 
persistentId) {
+            String tailTrackCollection, String tailTrackField, String 
persistentId, MongoDBTailTrackingEnum mongoDBTailTrackingStrategy) {
         this.increasingField = tailTrackIncreasingField;
         this.persistent = persistentTailTracking;
         this.db = tailTrackDb;
         this.persistentId = persistentId;
         this.collection = tailTrackCollection == null ? 
MongoDbTailTrackingConfig.DEFAULT_COLLECTION : tailTrackCollection;
         this.field = tailTrackField == null ? 
MongoDbTailTrackingConfig.DEFAULT_FIELD : tailTrackField;
+        this.mongoDBTailTrackingStrategy = mongoDBTailTrackingStrategy == null 
? MongoDBTailTrackingEnum.LITERAL : mongoDBTailTrackingStrategy;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
index 7e74a7e..68cf83a 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailTrackingManager.java
@@ -20,31 +20,29 @@ import com.mongodb.BasicDBObject;
 import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
-
+import org.bson.types.BSONTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MongoDbTailTrackingManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDbTailTrackingManager.class);
-    
     public Object lastVal;
-
     private final MongoClient connection;
     private final MongoDbTailTrackingConfig config;
     private MongoCollection<BasicDBObject> dbCol;
     private BasicDBObject trackingObj;
-    
+
     public MongoDbTailTrackingManager(MongoClient connection, 
MongoDbTailTrackingConfig config) {
         this.connection = connection;
         this.config = config;
     }
-    
+
     public void initialize() throws Exception {
         if (!config.persistent) {
             return;
         }
-        
+
         dbCol = 
connection.getDatabase(config.db).getCollection(config.collection, 
BasicDBObject.class);
         BasicDBObject filter = new BasicDBObject("persistentId", 
config.persistentId);
         trackingObj = dbCol.find(filter).first();
@@ -55,43 +53,42 @@ public class MongoDbTailTrackingManager {
         // keep only the _id, the rest is useless and causes more overhead 
during update
         trackingObj = new BasicDBObject("_id", trackingObj.get("_id"));
     }
-    
+
     public synchronized void persistToStore() {
         if (!config.persistent || lastVal == null) {
             return;
         }
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Persisting lastVal={} to store, collection: {}", 
lastVal, config.collection);
         }
-        
+
         BasicDBObject updateObj = new BasicDBObject().append("$set", new 
BasicDBObject(config.field, lastVal));
         dbCol.updateOne(trackingObj, updateObj);
         trackingObj = dbCol.find().first();
     }
-    
+
     public synchronized Object recoverFromStore() {
         if (!config.persistent) {
             return null;
         }
-        
+
         lastVal = dbCol.find(trackingObj).first().get(config.field);
-        
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Recovered lastVal={} from store, collection: {}", 
lastVal, config.collection);
         }
-        
+
         return lastVal;
     }
-    
+
     public void setLastVal(DBObject o) {
         if (config.increasingField == null) {
             return;
         }
-        
-        lastVal = o.get(config.increasingField);
+        lastVal = config.mongoDBTailTrackingStrategy.extractLastVal(o, 
config.increasingField);
     }
-    
+
     public String getIncreasingFieldName() {
         return config.increasingField;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
index 4c5fb8a..2a1d574 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingProcess.java
@@ -151,6 +151,9 @@ public class MongoDbTailingProcess implements Runnable {
      * The heart of the tailing process.
      */
     private void doRun() {
+        int counter = 0;
+        int persistRecords = endpoint.getPersistRecords();
+        boolean persistRegularly = persistRecords > 0;
         // while the cursor has more values, keepRunning is true and the 
cursorId is not 0, which symbolizes that the cursor is dead
         try {
             while (cursor.hasNext() && keepRunning) { //cursor.getCursorId() 
!= 0 &&
@@ -165,6 +168,9 @@ public class MongoDbTailingProcess implements Runnable {
                     // do nothing
                 }
                 tailTracking.setLastVal(dbObj);
+                if (persistRegularly && counter++ % persistRecords == 0) {
+                    tailTracking.persistToStore();
+                }
             }
         } catch (MongoCursorNotFoundException e) {
             // we only log the warning if we are not stopping, otherwise it is 
expected because the stop() method kills the cursor just in case it is blocked
@@ -187,7 +193,8 @@ public class MongoDbTailingProcess implements Runnable {
         if (lastVal == null) {
             answer = 
dbCol.find().cursorType(CursorType.TailableAwait).iterator();
         } else {
-            BasicDBObject queryObj = new 
BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", 
lastVal));
+            final String increasingFieldName = 
tailTracking.getIncreasingFieldName();
+            BasicDBObject queryObj = 
endpoint.getTailTrackingStrategy().createQuery(lastVal, increasingFieldName);
             answer = 
dbCol.find(queryObj).cursorType(CursorType.TailableAwait).iterator();
         }
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/3cf2a11f/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
new file mode 100644
index 0000000..2534701
--- /dev/null
+++ 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDBTailTrackingStrategyTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mongodb;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import org.bson.types.BSONTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MongoDBTailTrackingStrategyTest {
+
+    private static final String INCREASING_FIELD_NAME = "ts";
+
+    @Test
+    public void testExtractLastValForLiterals() throws Exception {
+        int expected = 1483701465;
+        DBObject o = mock(DBObject.class);
+        when(o.get(INCREASING_FIELD_NAME)).thenReturn(expected);
+        Object lastVal = MongoDBTailTrackingEnum.LITERAL.extractLastVal(o, 
INCREASING_FIELD_NAME);
+        assertThat(lastVal, is(expected));
+    }
+
+    @Test
+    public void testCreateQueryForLiterals() {
+        Integer lastVal = 1483701465;
+        BasicDBObject basicDBObject = 
MongoDBTailTrackingEnum.LITERAL.createQuery(lastVal, INCREASING_FIELD_NAME);
+        final Object actual = basicDBObject.get(INCREASING_FIELD_NAME);
+        assertThat(actual, is(notNullValue()));
+        assertThat(actual instanceof BasicDBObject, is(true));
+        assertThat(((BasicDBObject)actual).get("$gt"), is(lastVal));
+    }
+
+    @Test
+    public void testExtractLastValForTimestamp() throws Exception {
+        DBObject o = mock(DBObject.class);
+        final int lastVal = 1483701465;
+        when(o.get(INCREASING_FIELD_NAME)).thenReturn(new 
BSONTimestamp(lastVal, 1));
+        Object res = MongoDBTailTrackingEnum.TIMESTAMP.extractLastVal(o, 
INCREASING_FIELD_NAME);
+        assertThat(res, is(lastVal));
+    }
+
+    @Test
+    public void testExtracCreateQueryForTimestamp() throws Exception {
+        final int lastVal = 1483701465;
+        BasicDBObject basicDBObject = 
MongoDBTailTrackingEnum.TIMESTAMP.createQuery(lastVal, INCREASING_FIELD_NAME);
+        final Object actual = basicDBObject.get(INCREASING_FIELD_NAME);
+        assertThat(actual, is(notNullValue()));
+        assertThat(actual instanceof BasicDBObject, is(true));
+        assertThat(((BasicDBObject)actual).get("$gt") instanceof 
BSONTimestamp, is(true));
+        BSONTimestamp bsonTimestamp = (BSONTimestamp) 
((BasicDBObject)actual).get("$gt");
+        assertThat(bsonTimestamp.getTime(), is(lastVal));
+    }
+
+
+}
\ No newline at end of file

Reply via email to