[CAMEL-9659] Add different strategies for handling the detection of new files
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45204104 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45204104 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45204104 Branch: refs/heads/master Commit: 452041047e770c24e8802f2cac7cc76080554303 Parents: ce54b04 Author: Daniel Kulp <[email protected]> Authored: Tue Mar 1 11:48:43 2016 -0500 Committer: Daniel Kulp <[email protected]> Committed: Tue Mar 1 14:23:07 2016 -0500 ---------------------------------------------------------------------- .../camel/component/gridfs/GridFsConsumer.java | 98 ++++++++++++++++---- .../camel/component/gridfs/GridFsEndpoint.java | 48 +++++++++- .../component/gridfs/GridFsConsumerTest.java | 36 ++++++- 3 files changed, 157 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java b/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java index 240dd47..35d77ee 100644 --- a/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java +++ b/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java @@ -23,13 +23,17 @@ import java.io.InputStream; import java.util.concurrent.ExecutorService; import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; +import com.mongodb.MongoException; import com.mongodb.gridfs.GridFSDBFile; import com.mongodb.util.JSON; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.gridfs.GridFsEndpoint.QueryStrategy; import org.apache.camel.impl.DefaultConsumer; /** @@ -48,8 +52,6 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable { this.endpoint = endpoint; } - - @Override protected void doStop() throws Exception { super.doStop(); @@ -69,7 +71,38 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable { @Override public void run() { DBCursor c = null; - java.util.Date fromDate = new java.util.Date(); + java.util.Date fromDate = null; + + QueryStrategy s = endpoint.getQueryStrategy(); + boolean usesTimestamp = (s != QueryStrategy.FileAttribute); + boolean persistsTimestamp = (s == QueryStrategy.PersistentTimestamp || s == QueryStrategy.PersistentTimestampAndFileAttribute); + boolean usesAttribute = (s == QueryStrategy.FileAttribute + || s == QueryStrategy.TimeStampAndFileAttribute + || s == QueryStrategy.PersistentTimestampAndFileAttribute); + + DBCollection ptsCollection = null; + DBObject persistentTimestamp = null; + if (persistsTimestamp) { + ptsCollection = endpoint.getDB().getCollection(endpoint.getPersistentTSCollection()); + // ensure standard indexes as long as collections are small + try { + if (ptsCollection.count() < 1000) { + ptsCollection.createIndex(new BasicDBObject("id", 1)); + } + } catch (MongoException e) { + //TODO: Logging + } + persistentTimestamp = ptsCollection.findOne(new BasicDBObject("id", endpoint.getPersistentTSObject())); + if (persistentTimestamp == null) { + persistentTimestamp = new BasicDBObject("id", endpoint.getPersistentTSObject()); + fromDate = new java.util.Date(); + persistentTimestamp.put("timestamp", fromDate); + ptsCollection.save(persistentTimestamp); + } + fromDate = (java.util.Date)persistentTimestamp.get("timestamp"); + } else if (usesTimestamp) { + fromDate = new java.util.Date(); + } try { Thread.sleep(endpoint.getInitialDelay()); while (isStarted()) { @@ -84,27 +117,54 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable { } else { query = (DBObject) JSON.parse(queryString); } - - query.put("uploadDate", new BasicDBObject("$gte", fromDate)); + if (usesTimestamp) { + query.put("uploadDate", new BasicDBObject("$gt", fromDate)); + } + if (usesAttribute) { + query.put(endpoint.getFileAttributeName(), null); + } c = endpoint.getFilesCollection().find(query); - fromDate = new java.util.Date(); } + boolean dateModified = false; while (c.hasNext() && isStarted()) { GridFSDBFile file = (GridFSDBFile)c.next(); - file = endpoint.getGridFs().findOne(new BasicDBObject("_id", file.getId())); - - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, JSON.serialize(file.getMetaData())); - exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, file.getContentType()); - exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength()); - exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate()); - exchange.getIn().setBody(file.getInputStream(), InputStream.class); - try { - getProcessor().process(exchange); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + GridFSDBFile forig = file; + if (usesAttribute) { + file.put(endpoint.getFileAttributeName(), "processing"); + DBObject q = BasicDBObjectBuilder.start("_id", file.getId()).append("camel-processed", null).get(); + forig = (GridFSDBFile)endpoint.getFilesCollection().findAndModify(q, null, null, false, file, true, false); } + if (forig != null) { + file = endpoint.getGridFs().findOne(new BasicDBObject("_id", file.getId())); + + Exchange exchange = endpoint.createExchange(); + exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, JSON.serialize(file.getMetaData())); + exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, file.getContentType()); + exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength()); + exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate()); + exchange.getIn().setBody(file.getInputStream(), InputStream.class); + try { + getProcessor().process(exchange); + //System.out.println("Processing " + file.getFilename()); + if (usesAttribute) { + forig.put(endpoint.getFileAttributeName(), "done"); + endpoint.getFilesCollection().save(forig); + } + if (usesTimestamp) { + if (file.getUploadDate().compareTo(fromDate) > 0) { + fromDate = file.getUploadDate(); + dateModified = true; + } + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + if (persistsTimestamp && dateModified) { + persistentTimestamp.put("timestamp", fromDate); + ptsCollection.save(persistentTimestamp); } Thread.sleep(endpoint.getDelay()); } http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java b/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java index 008e004..554c4cd 100644 --- a/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java +++ b/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java @@ -36,6 +36,14 @@ import org.slf4j.LoggerFactory; @UriEndpoint(scheme = "gridfs", title = "MongoDBGridFS", syntax = "gridfs:connectionBean", label = "database,nosql") public class GridFsEndpoint extends DefaultEndpoint { + + public enum QueryStrategy { + TimeStamp, + PersistentTimestamp, + FileAttribute, + TimeStampAndFileAttribute, + PersistentTimestampAndFileAttribute + }; public static final String GRIDFS_OPERATION = "gridfs.operation"; public static final String GRIDFS_METADATA = "gridfs.metadata"; public static final String GRIDFS_CHUNKSIZE = "gridfs.chunksize"; @@ -64,7 +72,16 @@ public class GridFsEndpoint extends DefaultEndpoint { @UriParam private long delay = 500; - + @UriParam + private QueryStrategy queryStrategy = QueryStrategy.TimeStamp; + @UriParam + private String persistentTSCollection = "camel-timestamps"; + @UriParam + private String persistentTSObject = "camel-timestamp"; + @UriParam + private String fileAttributeName = "camel-processed"; + + private Mongo mongoConnection; private DB db; private GridFS gridFs; @@ -154,6 +171,10 @@ public class GridFsEndpoint extends DefaultEndpoint { this.mongoConnection = mongoConnection; } + public DB getDB() { + return db; + } + public String getDatabase() { return database; } @@ -186,6 +207,31 @@ public class GridFsEndpoint extends DefaultEndpoint { this.initialDelay = delay; } + public void setQueryStrategy(String s) { + queryStrategy = QueryStrategy.valueOf(s); + } + public QueryStrategy getQueryStrategy() { + return queryStrategy; + } + public void setPersistentTSCollection(String s) { + persistentTSCollection = s; + } + public String getPersistentTSCollection() { + return persistentTSCollection; + } + public void setPersistentTSObject(String s) { + persistentTSObject = s; + } + public String getPersistentTSObject() { + return persistentTSObject; + } + public void setFileAttributeName(String f) { + fileAttributeName = f; + } + public String getFileAttributeName() { + return fileAttributeName; + } + /** * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones. * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)} method. http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java b/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java index a84260c..77b1c6e 100644 --- a/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java +++ b/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java @@ -22,6 +22,8 @@ package org.apache.camel.component.gridfs; import java.util.HashMap; import java.util.Map; +import com.mongodb.gridfs.GridFS; + import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -37,14 +39,36 @@ public class GridFsConsumerTest extends AbstractMongoDbTest { return new RouteBuilder() { public void configure() { from("direct:create").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket=" + getBucket()); + from("direct:create-a").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket=" + getBucket() + "-a"); + from("direct:create-pts").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket=" + getBucket() + "-pts"); + from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket()).convertBodyTo(String.class).to("mock:test"); + from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket() + "-a&queryStrategy=FileAttribute") + .convertBodyTo(String.class).to("mock:test"); + from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket() + "-pts&queryStrategy=PersistentTimestamp") + .convertBodyTo(String.class).to("mock:test"); } }; } @Test - public void test() throws Exception { + public void testTimestamp() throws Exception { + runTest("direct:create", gridfs); + } + @Test + @SuppressWarnings("deprecation") + public void testAttribute() throws Exception { + runTest("direct:create-a", new GridFS(mongo.getDB("test"), getBucket() + "-a")); + } + + @Test + @SuppressWarnings("deprecation") + public void testPersistentTS() throws Exception { + runTest("direct:create-pts", new GridFS(mongo.getDB("test"), getBucket() + "-pts")); + } + + public void runTest(String target, GridFS gridfs) throws Exception { MockEndpoint mock = getMockEndpoint("mock:test"); String data = "This is some stuff to go into the db"; mock.expectedMessageCount(1); @@ -55,7 +79,7 @@ public class GridFsConsumerTest extends AbstractMongoDbTest { assertEquals(0, gridfs.find(fn).size()); headers.put(Exchange.FILE_NAME, fn); - template.requestBodyAndHeaders("direct:create", data, headers); + template.requestBodyAndHeaders(target, data, headers); mock.assertIsSatisfied(); mock.reset(); @@ -64,11 +88,13 @@ public class GridFsConsumerTest extends AbstractMongoDbTest { mock.expectedBodiesReceived(data, data, data); headers.put(Exchange.FILE_NAME, fn + "_1"); - template.requestBodyAndHeaders("direct:create", data, headers); + template.requestBodyAndHeaders(target, data, headers); headers.put(Exchange.FILE_NAME, fn + "_2"); - template.requestBodyAndHeaders("direct:create", data, headers); + template.requestBodyAndHeaders(target, data, headers); headers.put(Exchange.FILE_NAME, fn + "_3"); - template.requestBodyAndHeaders("direct:create", data, headers); + template.requestBodyAndHeaders(target, data, headers); + mock.assertIsSatisfied(); + Thread.sleep(1000); mock.assertIsSatisfied(); }
