CAMEL-8727: File consumer - Add read lock that is based on idempotent repository


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/411c944c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/411c944c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/411c944c

Branch: refs/heads/master
Commit: 411c944c8117de0a6ed42f5df1e864b31929375e
Parents: ad736cd
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun May 3 10:48:17 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun May 3 10:52:37 2015 +0200

----------------------------------------------------------------------
 .../component/file/GenericFileEndpoint.java     |  3 ++-
 ...ileIdempotentRepositoryReadLockStrategy.java | 23 +++++++++-----------
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/411c944c/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 80c597b..43acaad 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -893,7 +893,8 @@ public abstract class GenericFileEndpoint<T> extends 
ScheduledPollEndpoint imple
 
     /**
      * Logging level used when a read lock could not be acquired.
-     * By default a WARN is logged. You can change this level, for example to 
OFF to not have any logging.
+     * By default a WARN is logged.
+     * You can change this level, for example to OFF to not have any logging.
      * This option is only applicable for readLock of types: changed, 
fileLock, rename.
      */
     public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {

http://git-wip-us.apache.org/repos/asf/camel/blob/411c944c/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 4088c0c..d2b1b6f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -43,7 +43,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends 
ServiceSupport imp
     private static final transient Logger LOG = 
LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class);
 
     private GenericFileEndpoint<File> endpoint;
-    private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG;
+    private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
     private CamelContext camelContext;
     private IdempotentRepository<String> idempotentRepository;
     private boolean removeOnRollback = true;
@@ -57,10 +57,17 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport imp
 
     @Override
     public boolean acquireExclusiveReadLock(GenericFileOperations<File> 
operations, GenericFile<File> file, Exchange exchange) throws Exception {
+        // in clustered mode then another node may have processed the file so 
we must check here again if the file exists
+        File path = file.getFile();
+        if (!path.exists()) {
+            return false;
+        }
+
         // check if we can begin on this file
         String key = asKey(file);
         boolean answer = idempotentRepository.add(key);
         if (!answer) {
+            // another node is processing the file so skip
             CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read 
lock. Will skip the file: " + file);
         }
         return answer;
@@ -98,16 +105,6 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport imp
         // noop
     }
 
-    /**
-     * Sets logging level used when a read lock could not be acquired.
-     * <p/>
-     * Logging level used when a read lock could not be acquired.
-     * <p/>
-     * The default logging level is DEBUG as it may be more common not to be 
able to acquire a read lock
-     * when using idempotent repository in a clustered setup, as another node 
may be processing the file.
-     *
-     * @param readLockLoggingLevel LoggingLevel
-     */
     public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
         this.readLockLoggingLevel = readLockLoggingLevel;
     }
@@ -159,7 +156,7 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport imp
     /**
      * Whether to remove the file from the idempotent repository when doing a 
commit.
      * <p/>
-     * By default this is commit.
+     * By default this is true.
      */
     public boolean isRemoveOnCommit() {
         return removeOnCommit;
@@ -168,7 +165,7 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport imp
     /**
      * Whether to remove the file from the idempotent repository when doing a 
commit.
      * <p/>
-     * By default this is commit.
+     * By default this is true.
      */
     public void setRemoveOnCommit(boolean removeOnCommit) {
         this.removeOnCommit = removeOnCommit;

Reply via email to