CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/411c944c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/411c944c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/411c944c Branch: refs/heads/master Commit: 411c944c8117de0a6ed42f5df1e864b31929375e Parents: ad736cd Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 3 10:48:17 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 3 10:52:37 2015 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 3 ++- ...ileIdempotentRepositoryReadLockStrategy.java | 23 +++++++++----------- 2 files changed, 12 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/411c944c/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 80c597b..43acaad 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -893,7 +893,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple /** * Logging level used when a read lock could not be acquired. - * By default a WARN is logged. You can change this level, for example to OFF to not have any logging. + * By default a WARN is logged. + * You can change this level, for example to OFF to not have any logging. * This option is only applicable for readLock of types: changed, fileLock, rename. */ public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { http://git-wip-us.apache.org/repos/asf/camel/blob/411c944c/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index 4088c0c..d2b1b6f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -43,7 +43,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class); private GenericFileEndpoint<File> endpoint; - private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG; + private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; private CamelContext camelContext; private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; @@ -57,10 +57,17 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp @Override public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // in clustered mode then another node may have processed the file so we must check here again if the file exists + File path = file.getFile(); + if (!path.exists()) { + return false; + } + // check if we can begin on this file String key = asKey(file); boolean answer = idempotentRepository.add(key); if (!answer) { + // another node is processing the file so skip CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); } return answer; @@ -98,16 +105,6 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp // noop } - /** - * Sets logging level used when a read lock could not be acquired. - * <p/> - * Logging level used when a read lock could not be acquired. - * <p/> - * The default logging level is DEBUG as it may be more common not to be able to acquire a read lock - * when using idempotent repository in a clustered setup, as another node may be processing the file. - * - * @param readLockLoggingLevel LoggingLevel - */ public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { this.readLockLoggingLevel = readLockLoggingLevel; } @@ -159,7 +156,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp /** * Whether to remove the file from the idempotent repository when doing a commit. * <p/> - * By default this is commit. + * By default this is true. */ public boolean isRemoveOnCommit() { return removeOnCommit; @@ -168,7 +165,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp /** * Whether to remove the file from the idempotent repository when doing a commit. * <p/> - * By default this is commit. + * By default this is true. */ public void setRemoveOnCommit(boolean removeOnCommit) { this.removeOnCommit = removeOnCommit;