CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/504b0d84 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/504b0d84 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/504b0d84 Branch: refs/heads/master Commit: 504b0d84078944c7e632316d57a58fcb220ca494 Parents: 0fa7d69 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon May 4 19:40:20 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon May 4 19:40:31 2015 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 20 +++++++++++++++ ...ileIdempotentRepositoryReadLockStrategy.java | 27 ++++++++++++++++++-- .../strategy/FileProcessStrategyFactory.java | 4 +++ 3 files changed, 49 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 9cd7b4f..ae5381b 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -173,6 +173,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple @UriParam(label = "consumer", defaultValue = "true") protected boolean readLockRemoveOnRollback = true; @UriParam(label = "consumer") + protected boolean readLockRemoveOnCommit; + @UriParam(label = "consumer") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @UriParam(label = "consumer") protected ExceptionHandler onCompletionExceptionHandler; @@ -942,6 +944,23 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple this.readLockRemoveOnRollback = readLockRemoveOnRollback; } + public boolean isReadLockRemoveOnCommit() { + return readLockRemoveOnCommit; + } + + /** + * This option applied only for readLock=idempotent. + * This option allows to specify whether to remove the file name entry from the idempotent repository + * when processing the file is succeeded and a commit happens. + * <p/> + * By default the file is not removed which ensures that any race-condition do not occur so another active + * node may attempt to grab the file. Instead the idempotent repository may support eviction strategies + * that you can configure to evict the file name entry after X minutes - this ensures no problems with race conditions. + */ + public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) { + this.readLockRemoveOnCommit = readLockRemoveOnCommit; + } + public int getBufferSize() { return bufferSize; } @@ -1256,6 +1275,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockLoggingLevel", readLockLoggingLevel); params.put("readLockMinAge", readLockMinAge); params.put("readLockRemoveOnRollback", readLockRemoveOnRollback); + params.put("readLockRemoveOnCommit", readLockRemoveOnCommit); return params; } http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index 763b7e0..b9cf193 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -47,6 +47,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp private CamelContext camelContext; private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; + private boolean removeOnCommit; @Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { @@ -91,8 +92,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp @Override public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); - // confirm on commit - idempotentRepository.confirm(key); + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // confirm on commit + idempotentRepository.confirm(key); + } } public void setTimeout(long timeout) { @@ -151,6 +156,24 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp this.removeOnRollback = removeOnRollback; } + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public boolean isRemoveOnCommit() { + return removeOnCommit; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is false. + */ + public void setRemoveOnCommit(boolean removeOnCommit) { + this.removeOnCommit = removeOnCommit; + } + protected String asKey(GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression key was configured String key = file.getAbsoluteFilePath(); http://git-wip-us.apache.org/repos/asf/camel/blob/504b0d84/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index 5a31374..f9ceca2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -134,6 +134,10 @@ public final class FileProcessStrategyFactory { if (readLockRemoveOnRollback != null) { readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); if (repo != null) { readLockStrategy.setIdempotentRepository(repo);