This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6d24ee8 CAMEL-17121: converted mongodb-gridfs to the repeatable tasks (#6356) 6d24ee8 is described below commit 6d24ee80f25df4d33ff02e4759e7be018dd5a0ee Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Sat Oct 30 07:32:01 2021 +0200 CAMEL-17121: converted mongodb-gridfs to the repeatable tasks (#6356) * CAMEL-17121: added support for unlimited iterations * CAMEL-17121: converted mongodb-gridfs to the repeatable tasks --- .../component/mongodb/gridfs/GridFsConsumer.java | 179 ++++++++++++--------- .../task/budget/IterationBoundedBudget.java | 12 +- 2 files changed, 111 insertions(+), 80 deletions(-) diff --git a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java index acfc588..fe7639e 100644 --- a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java +++ b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.mongodb.gridfs; import java.io.InputStream; +import java.time.Duration; import java.util.Date; import java.util.concurrent.ExecutorService; @@ -32,7 +33,10 @@ import com.mongodb.client.model.Updates; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; -import org.apache.camel.util.IOHelper; +import org.apache.camel.support.task.BlockingTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; +import org.apache.camel.support.task.budget.IterationBoundedBudget; import org.bson.Document; import org.bson.conversions.Bson; @@ -71,7 +75,6 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable { @Override public void run() { - MongoCursor<GridFSFile> cursor = null; Date fromDate = null; QueryStrategy queryStrategy = endpoint.getQueryStrategy(); @@ -103,92 +106,112 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable { } else if (usesTimestamp) { fromDate = new Date(); } - try { - Thread.sleep(endpoint.getInitialDelay()); - while (isStarted()) { - if (cursor == null) { - String queryString = endpoint.getQuery(); - Bson query = null; - if (queryString != null) { - query = Document.parse(queryString); - } - if (usesTimestamp) { - Bson uploadDateFilter = Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE, fromDate); - if (query == null) { - query = uploadDateFilter; - } else { - query = Filters.and(query, uploadDateFilter); - } - } - if (usesAttribute) { - Bson fileAttributeNameFilter = Filters.eq(endpoint.getFileAttributeName(), null); - if (query == null) { - query = fileAttributeNameFilter; - } else { - query = Filters.and(query, fileAttributeNameFilter); - } - } - cursor = endpoint.getGridFsBucket().find(query).cursor(); + + BlockingTask task = Tasks.foregroundTask() + .withBudget(Budgets.iterationBudget() + .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS) + .withInterval(Duration.ofMillis(endpoint.getDelay())) + .withInitialDelay(Duration.ofMillis(endpoint.getInitialDelay())) + .build()) + .build(); + + MongoCollection<Document> finalPtsCollection = ptsCollection; + Date finalFromDate = fromDate; + Document finalPersistentTimestamp = persistentTimestamp; + task.run(() -> processCollection(finalFromDate, usesTimestamp, persistsTimestamp, usesAttribute, finalPtsCollection, + finalPersistentTimestamp)); + } + + private boolean processCollection( + Date fromDate, boolean usesTimestamp, boolean persistsTimestamp, boolean usesAttribute, + final MongoCollection<Document> ptsCollection, final Document persistentTimestamp) { + + if (!isStarted()) { + return false; + } + + try (MongoCursor<GridFSFile> cursor = getGridFSFileMongoCursor(fromDate, usesTimestamp, usesAttribute)) { + boolean dateModified = false; + + while (cursor.hasNext() && isStarted()) { + GridFSFile file = cursor.next(); + GridFSFile fOrig = file; + if (usesAttribute) { + FindOneAndUpdateOptions options = new FindOneAndUpdateOptions(); + options.returnDocument(ReturnDocument.AFTER); + Bson filter = Filters.and(eq("_id", file.getId()), eq(endpoint.getFileAttributeName(), null)); + Bson update = Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_PROCESSING); + fOrig = endpoint.getFilesCollection().findOneAndUpdate(filter, update, options); } - boolean dateModified = false; - while (cursor.hasNext() && isStarted()) { - GridFSFile file = cursor.next(); - GridFSFile forig = file; - if (usesAttribute) { - FindOneAndUpdateOptions options = new FindOneAndUpdateOptions(); - options.returnDocument(ReturnDocument.AFTER); - Bson filter = Filters.and(eq("_id", file.getId()), eq(endpoint.getFileAttributeName(), null)); - Bson update = Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_PROCESSING); - forig = endpoint.getFilesCollection().findOneAndUpdate(filter, update, options); - } - if (forig != null) { - Exchange exchange = createExchange(true); - GridFSDownloadStream downloadStream = endpoint.getGridFsBucket().openDownloadStream(file.getFilename()); - file = downloadStream.getGridFSFile(); - - Document metadata = file.getMetadata(); - if (metadata != null) { - String contentType = metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class); - if (contentType != null) { - exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType); - } - exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson()); + if (fOrig != null) { + Exchange exchange = createExchange(true); + GridFSDownloadStream downloadStream = endpoint.getGridFsBucket().openDownloadStream(file.getFilename()); + file = downloadStream.getGridFSFile(); + + Document metadata = file.getMetadata(); + if (metadata != null) { + String contentType = metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class); + if (contentType != null) { + exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType); } + exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson()); + } - exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength()); - exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate()); - exchange.getIn().setBody(downloadStream, InputStream.class); - try { - getProcessor().process(exchange); - if (usesAttribute) { - Bson update = Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE); - endpoint.getFilesCollection().findOneAndUpdate(eq("_id", forig.getId()), update); - } - if (usesTimestamp) { - if (file.getUploadDate().compareTo(fromDate) > 0) { - fromDate = file.getUploadDate(); - dateModified = true; - } - } - } catch (Exception e) { - // ignore + exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength()); + exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate()); + exchange.getIn().setBody(downloadStream, InputStream.class); + try { + getProcessor().process(exchange); + if (usesAttribute) { + Bson update = Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE); + endpoint.getFilesCollection().findOneAndUpdate(eq("_id", fOrig.getId()), update); + } + if (usesTimestamp && file.getUploadDate().compareTo(fromDate) > 0) { + fromDate = file.getUploadDate(); + dateModified = true; } + } catch (Exception e) { + // ignore } } + } - if (persistsTimestamp && dateModified) { - Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY, fromDate); - ptsCollection.findOneAndUpdate(eq("_id", persistentTimestamp.getObjectId("_id")), update); - } + if (persistsTimestamp && dateModified) { + Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY, fromDate); + ptsCollection.findOneAndUpdate(eq("_id", persistentTimestamp.getObjectId("_id")), update); + } + } - cursor = null; - Thread.sleep(endpoint.getDelay()); + return false; + } + + private MongoCursor<GridFSFile> getGridFSFileMongoCursor(Date fromDate, boolean usesTimestamp, boolean usesAttribute) { + String queryString = endpoint.getQuery(); + Bson query = getBsonDocument(fromDate, usesTimestamp, usesAttribute, queryString); + return endpoint.getGridFsBucket().find(query).cursor(); + } + + private Bson getBsonDocument(Date fromDate, boolean usesTimestamp, boolean usesAttribute, String queryString) { + Bson query = null; + if (queryString != null) { + query = Document.parse(queryString); + } + if (usesTimestamp) { + Bson uploadDateFilter = Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE, fromDate); + if (query == null) { + query = uploadDateFilter; + } else { + query = Filters.and(query, uploadDateFilter); } - } catch (Exception e1) { - // ignore } - if (cursor != null) { - IOHelper.close(cursor); + if (usesAttribute) { + Bson fileAttributeNameFilter = Filters.eq(endpoint.getFileAttributeName(), null); + if (query == null) { + query = fileAttributeNameFilter; + } else { + query = Filters.and(query, fileAttributeNameFilter); + } } + return query; } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java index 86ccee6..1225846 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java @@ -18,6 +18,8 @@ package org.apache.camel.support.task.budget; public class IterationBoundedBudget implements IterationBudget { + public static final int UNLIMITED_ITERATIONS = -1; + private final long initialDelay; private final long interval; private final int maxIterations; @@ -51,7 +53,9 @@ public class IterationBoundedBudget implements IterationBudget { @Override public boolean next() { if (canContinue()) { - iterations++; + if (iterations != UNLIMITED_ITERATIONS) { + iterations++; + } return true; } @@ -61,6 +65,10 @@ public class IterationBoundedBudget implements IterationBudget { @Override public boolean canContinue() { - return iterations < maxIterations; + if (maxIterations != UNLIMITED_ITERATIONS) { + return iterations < maxIterations; + } + + return true; } }