This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d24ee8  CAMEL-17121: converted mongodb-gridfs to the repeatable tasks 
(#6356)
6d24ee8 is described below

commit 6d24ee80f25df4d33ff02e4759e7be018dd5a0ee
Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com>
AuthorDate: Sat Oct 30 07:32:01 2021 +0200

    CAMEL-17121: converted mongodb-gridfs to the repeatable tasks (#6356)
    
    * CAMEL-17121: added support for unlimited iterations
    
    * CAMEL-17121: converted mongodb-gridfs to the repeatable tasks
---
 .../component/mongodb/gridfs/GridFsConsumer.java   | 179 ++++++++++++---------
 .../task/budget/IterationBoundedBudget.java        |  12 +-
 2 files changed, 111 insertions(+), 80 deletions(-)

diff --git 
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
 
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
index acfc588..fe7639e 100644
--- 
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
+++ 
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.mongodb.gridfs;
 
 import java.io.InputStream;
+import java.time.Duration;
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
 
@@ -32,7 +33,10 @@ import com.mongodb.client.model.Updates;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.util.IOHelper;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -71,7 +75,6 @@ public class GridFsConsumer extends DefaultConsumer 
implements Runnable {
 
     @Override
     public void run() {
-        MongoCursor<GridFSFile> cursor = null;
         Date fromDate = null;
 
         QueryStrategy queryStrategy = endpoint.getQueryStrategy();
@@ -103,92 +106,112 @@ public class GridFsConsumer extends DefaultConsumer 
implements Runnable {
         } else if (usesTimestamp) {
             fromDate = new Date();
         }
-        try {
-            Thread.sleep(endpoint.getInitialDelay());
-            while (isStarted()) {
-                if (cursor == null) {
-                    String queryString = endpoint.getQuery();
-                    Bson query = null;
-                    if (queryString != null) {
-                        query = Document.parse(queryString);
-                    }
-                    if (usesTimestamp) {
-                        Bson uploadDateFilter = 
Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE, fromDate);
-                        if (query == null) {
-                            query = uploadDateFilter;
-                        } else {
-                            query = Filters.and(query, uploadDateFilter);
-                        }
-                    }
-                    if (usesAttribute) {
-                        Bson fileAttributeNameFilter = 
Filters.eq(endpoint.getFileAttributeName(), null);
-                        if (query == null) {
-                            query = fileAttributeNameFilter;
-                        } else {
-                            query = Filters.and(query, 
fileAttributeNameFilter);
-                        }
-                    }
-                    cursor = endpoint.getGridFsBucket().find(query).cursor();
+
+        BlockingTask task = Tasks.foregroundTask()
+                .withBudget(Budgets.iterationBudget()
+                        
.withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+                        .withInterval(Duration.ofMillis(endpoint.getDelay()))
+                        
.withInitialDelay(Duration.ofMillis(endpoint.getInitialDelay()))
+                        .build())
+                .build();
+
+        MongoCollection<Document> finalPtsCollection = ptsCollection;
+        Date finalFromDate = fromDate;
+        Document finalPersistentTimestamp = persistentTimestamp;
+        task.run(() -> processCollection(finalFromDate, usesTimestamp, 
persistsTimestamp, usesAttribute, finalPtsCollection,
+                finalPersistentTimestamp));
+    }
+
+    private boolean processCollection(
+            Date fromDate, boolean usesTimestamp, boolean persistsTimestamp, 
boolean usesAttribute,
+            final MongoCollection<Document> ptsCollection, final Document 
persistentTimestamp) {
+
+        if (!isStarted()) {
+            return false;
+        }
+
+        try (MongoCursor<GridFSFile> cursor = 
getGridFSFileMongoCursor(fromDate, usesTimestamp, usesAttribute)) {
+            boolean dateModified = false;
+
+            while (cursor.hasNext() && isStarted()) {
+                GridFSFile file = cursor.next();
+                GridFSFile fOrig = file;
+                if (usesAttribute) {
+                    FindOneAndUpdateOptions options = new 
FindOneAndUpdateOptions();
+                    options.returnDocument(ReturnDocument.AFTER);
+                    Bson filter = Filters.and(eq("_id", file.getId()), 
eq(endpoint.getFileAttributeName(), null));
+                    Bson update = Updates.set(endpoint.getFileAttributeName(), 
GRIDFS_FILE_ATTRIBUTE_PROCESSING);
+                    fOrig = 
endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
                 }
-                boolean dateModified = false;
-                while (cursor.hasNext() && isStarted()) {
-                    GridFSFile file = cursor.next();
-                    GridFSFile forig = file;
-                    if (usesAttribute) {
-                        FindOneAndUpdateOptions options = new 
FindOneAndUpdateOptions();
-                        options.returnDocument(ReturnDocument.AFTER);
-                        Bson filter = Filters.and(eq("_id", file.getId()), 
eq(endpoint.getFileAttributeName(), null));
-                        Bson update = 
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_PROCESSING);
-                        forig = 
endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
-                    }
-                    if (forig != null) {
-                        Exchange exchange = createExchange(true);
-                        GridFSDownloadStream downloadStream = 
endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
-                        file = downloadStream.getGridFSFile();
-
-                        Document metadata = file.getMetadata();
-                        if (metadata != null) {
-                            String contentType = 
metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class);
-                            if (contentType != null) {
-                                
exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType);
-                            }
-                            
exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson());
+                if (fOrig != null) {
+                    Exchange exchange = createExchange(true);
+                    GridFSDownloadStream downloadStream = 
endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
+                    file = downloadStream.getGridFSFile();
+
+                    Document metadata = file.getMetadata();
+                    if (metadata != null) {
+                        String contentType = 
metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class);
+                        if (contentType != null) {
+                            
exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType);
                         }
+                        
exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson());
+                    }
 
-                        exchange.getIn().setHeader(Exchange.FILE_LENGTH, 
file.getLength());
-                        
exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate());
-                        exchange.getIn().setBody(downloadStream, 
InputStream.class);
-                        try {
-                            getProcessor().process(exchange);
-                            if (usesAttribute) {
-                                Bson update = 
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE);
-                                
endpoint.getFilesCollection().findOneAndUpdate(eq("_id", forig.getId()), 
update);
-                            }
-                            if (usesTimestamp) {
-                                if (file.getUploadDate().compareTo(fromDate) > 
0) {
-                                    fromDate = file.getUploadDate();
-                                    dateModified = true;
-                                }
-                            }
-                        } catch (Exception e) {
-                            // ignore
+                    exchange.getIn().setHeader(Exchange.FILE_LENGTH, 
file.getLength());
+                    exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, 
file.getUploadDate());
+                    exchange.getIn().setBody(downloadStream, 
InputStream.class);
+                    try {
+                        getProcessor().process(exchange);
+                        if (usesAttribute) {
+                            Bson update = 
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE);
+                            
endpoint.getFilesCollection().findOneAndUpdate(eq("_id", fOrig.getId()), 
update);
+                        }
+                        if (usesTimestamp && 
file.getUploadDate().compareTo(fromDate) > 0) {
+                            fromDate = file.getUploadDate();
+                            dateModified = true;
                         }
+                    } catch (Exception e) {
+                        // ignore
                     }
                 }
+            }
 
-                if (persistsTimestamp && dateModified) {
-                    Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY, 
fromDate);
-                    ptsCollection.findOneAndUpdate(eq("_id", 
persistentTimestamp.getObjectId("_id")), update);
-                }
+            if (persistsTimestamp && dateModified) {
+                Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY, fromDate);
+                ptsCollection.findOneAndUpdate(eq("_id", 
persistentTimestamp.getObjectId("_id")), update);
+            }
+        }
 
-                cursor = null;
-                Thread.sleep(endpoint.getDelay());
+        return false;
+    }
+
+    private MongoCursor<GridFSFile> getGridFSFileMongoCursor(Date fromDate, 
boolean usesTimestamp, boolean usesAttribute) {
+        String queryString = endpoint.getQuery();
+        Bson query = getBsonDocument(fromDate, usesTimestamp, usesAttribute, 
queryString);
+        return endpoint.getGridFsBucket().find(query).cursor();
+    }
+
+    private Bson getBsonDocument(Date fromDate, boolean usesTimestamp, boolean 
usesAttribute, String queryString) {
+        Bson query = null;
+        if (queryString != null) {
+            query = Document.parse(queryString);
+        }
+        if (usesTimestamp) {
+            Bson uploadDateFilter = Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE, 
fromDate);
+            if (query == null) {
+                query = uploadDateFilter;
+            } else {
+                query = Filters.and(query, uploadDateFilter);
             }
-        } catch (Exception e1) {
-            // ignore
         }
-        if (cursor != null) {
-            IOHelper.close(cursor);
+        if (usesAttribute) {
+            Bson fileAttributeNameFilter = 
Filters.eq(endpoint.getFileAttributeName(), null);
+            if (query == null) {
+                query = fileAttributeNameFilter;
+            } else {
+                query = Filters.and(query, fileAttributeNameFilter);
+            }
         }
+        return query;
     }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
index 86ccee6..1225846 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
@@ -18,6 +18,8 @@
 package org.apache.camel.support.task.budget;
 
 public class IterationBoundedBudget implements IterationBudget {
+    public static final int UNLIMITED_ITERATIONS = -1;
+
     private final long initialDelay;
     private final long interval;
     private final int maxIterations;
@@ -51,7 +53,9 @@ public class IterationBoundedBudget implements 
IterationBudget {
     @Override
     public boolean next() {
         if (canContinue()) {
-            iterations++;
+            if (iterations != UNLIMITED_ITERATIONS) {
+                iterations++;
+            }
 
             return true;
         }
@@ -61,6 +65,10 @@ public class IterationBoundedBudget implements 
IterationBudget {
 
     @Override
     public boolean canContinue() {
-        return iterations < maxIterations;
+        if (maxIterations != UNLIMITED_ITERATIONS) {
+            return iterations < maxIterations;
+        }
+
+        return true;
     }
 }

Reply via email to