CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f24eee8b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f24eee8b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f24eee8b Branch: refs/heads/master Commit: f24eee8beb158c0ef2c259e2fb22369f79f53bb9 Parents: 9f86d16 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 3 09:32:07 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 3 10:52:36 2015 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 33 +++++++++++++++++- ...ileIdempotentRepositoryReadLockStrategy.java | 36 ++++++++++++++++++-- .../strategy/FileProcessStrategyFactory.java | 4 +++ 3 files changed, 70 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 0eafd12..48512e9 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -169,6 +169,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple @UriParam(label = "consumer", defaultValue = "0") protected long readLockMinAge; @UriParam(label = "consumer", defaultValue = "true") + protected boolean readLockRemoveOnCommit = true; + @UriParam(label = "consumer", defaultValue = "true") protected boolean readLockRemoveOnRollback = true; @UriParam(label = "consumer") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @@ -904,7 +906,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple /** * This option applied only for readLock=change. - * This options allows to specify a minimum age the file must be before attempting to acquire the read lock. + * This option allows to specify a minimum age the file must be before attempting to acquire the read lock. * For example use readLockMinAge=300s to require the file is at last 5 minutes old. * This can speedup the changed read lock as it will only attempt to acquire files which are at least that given age. */ @@ -912,6 +914,34 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple this.readLockMinAge = readLockMinAge; } + public boolean isReadLockRemoveOnCommit() { + return readLockRemoveOnCommit; + } + + /** + * This option applied only for readLock=idempotent. + * This option allows to specify whether to remove the file name entry from the idempotent repository + * when the file was processed successfully and is committed. Setting this to <tt>false</tt> allows + * to use the read lock as both read lock and idempotent consumer at the same time, as previously + * processed file will be kept in the idempotent repository so the same file is not processed again. + */ + public void setReadLockRemoveOnCommit(boolean readLockRemoveOnCommit) { + this.readLockRemoveOnCommit = readLockRemoveOnCommit; + } + + public boolean isReadLockRemoveOnRollback() { + return readLockRemoveOnRollback; + } + + /** + * This option applied only for readLock=idempotent. + * This option allows to specify whether to remove the file name entry from the idempotent repository + * when processing the file failed and a rollback happens. + */ + public void setReadLockRemoveOnRollback(boolean readLockRemoveOnRollback) { + this.readLockRemoveOnRollback = readLockRemoveOnRollback; + } + public int getBufferSize() { return bufferSize; } @@ -1225,6 +1255,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockMinLength", readLockMinLength); params.put("readLockLoggingLevel", readLockLoggingLevel); params.put("readLockMinAge", readLockMinAge); + params.put("readLockRemoveOnCommit", readLockRemoveOnCommit); params.put("readLockRemoveOnRollback", readLockRemoveOnRollback); return params; } http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index 8e043b8..28f4865 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -42,13 +42,16 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class); + private GenericFileEndpoint<File> endpoint; private LoggingLevel loggingLevel = LoggingLevel.TRACE; private CamelContext camelContext; private IdempotentRepository<String> idempotentRepository; private boolean removeOnRollback = true; + private boolean removeOnCommit = true; @Override public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { + this.endpoint = endpoint; LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); } @@ -80,7 +83,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { String key = asKey(file); CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnCommit: " + key); - idempotentRepository.contains(key); + if (removeOnCommit) { + idempotentRepository.remove(key); + } else { + // if not remove then confirm + idempotentRepository.confirm(key); + } } public void setTimeout(long timeout) { @@ -139,8 +147,32 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport imp this.removeOnRollback = removeOnRollback; } + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is commit. + */ + public boolean isRemoveOnCommit() { + return removeOnCommit; + } + + /** + * Whether to remove the file from the idempotent repository when doing a commit. + * <p/> + * By default this is commit. + */ + public void setRemoveOnCommit(boolean removeOnCommit) { + this.removeOnCommit = removeOnCommit; + } + protected String asKey(GenericFile<File> file) { - return file.getAbsoluteFilePath(); + // use absolute file path as default key, but evaluate if an expression key was configured + String key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + return key; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/f24eee8b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index 5a31374..6987905 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -130,6 +130,10 @@ public final class FileProcessStrategyFactory { strategy = readLockStrategy; } else if ("idempotent".equals(readLock)) { FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); if (readLockRemoveOnRollback != null) { readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback);