CAMEL-8727: File consumer - Add read lock that is based on idempotent repository
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78ccf133 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78ccf133 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78ccf133 Branch: refs/heads/master Commit: 78ccf13325d4e159df9161b68e0fc7602de5e6d8 Parents: 1c20c43 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 1 14:42:33 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 3 10:52:34 2015 +0200 ---------------------------------------------------------------------- .../component/file/GenericFileEndpoint.java | 20 ++- .../GenericFileExclusiveReadLockStrategy.java | 25 ++- ...ileIdempotentRepositoryReadLockStrategy.java | 160 +++++++++++++++++++ .../FileLockExclusiveReadLockStrategy.java | 9 +- .../strategy/FileProcessStrategyFactory.java | 12 ++ .../FileRenameExclusiveReadLockStrategy.java | 30 +++- .../GenericFileProcessStrategySupport.java | 6 +- ...ericFileRenameExclusiveReadLockStrategy.java | 13 +- .../GenericFileRenameProcessStrategy.java | 2 +- .../MarkerFileExclusiveReadLockStrategy.java | 18 ++- 10 files changed, 272 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 8f1d458..0eafd12 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -154,7 +154,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple protected Comparator<GenericFile<T>> sorter; @UriParam(label = "consumer") protected Comparator<Exchange> sortBy; - @UriParam(label = "consumer", enums = "none,markerFile,fileLock,rename,changed") + @UriParam(label = "consumer", enums = "none,markerFile,fileLock,rename,changed,idempotent") protected String readLock = "none"; @UriParam(label = "consumer", defaultValue = "1000") protected long readLockCheckInterval = 1000; @@ -168,6 +168,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple protected long readLockMinLength = 1; @UriParam(label = "consumer", defaultValue = "0") protected long readLockMinAge; + @UriParam(label = "consumer", defaultValue = "true") + protected boolean readLockRemoveOnRollback = true; @UriParam(label = "consumer") protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy; @UriParam(label = "consumer") @@ -1210,6 +1212,9 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple if (readLock != null) { params.put("readLock", readLock); } + if ("idempotent".equals(readLock)) { + params.put("readLockIdempotentRepository", idempotentRepository); + } if (readLockCheckInterval > 0) { params.put("readLockCheckInterval", readLockCheckInterval); } @@ -1220,7 +1225,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple params.put("readLockMinLength", readLockMinLength); params.put("readLockLoggingLevel", readLockLoggingLevel); params.put("readLockMinAge", readLockMinAge); - + params.put("readLockRemoveOnRollback", readLockRemoveOnRollback); return params; } @@ -1331,14 +1336,21 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple + " to ensure that the read lock procedure has enough time to acquire the lock."); } } + if ("idempotent".equals(readLock) && idempotentRepository == null) { + throw new IllegalArgumentException("IdempotentRepository must be configured when using readLock=idempotent"); + } - ServiceHelper.startServices(inProgressRepository, idempotentRepository); + // idempotent repository may be used by others, so add it as a service so its stopped when CamelContext stops + if (idempotentRepository != null) { + getCamelContext().addService(idempotentRepository, true); + } + ServiceHelper.startServices(inProgressRepository); super.doStart(); } @Override protected void doStop() throws Exception { super.doStop(); - ServiceHelper.stopServices(inProgressRepository, idempotentRepository); + ServiceHelper.stopServices(inProgressRepository); } } http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java index 36875f5..cf950d2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileExclusiveReadLockStrategy.java @@ -31,6 +31,7 @@ import org.apache.camel.LoggingLevel; * <li>FileLockExclusiveReadLockStrategy acquiring a RW file lock for the duration of the processing.</li> * <li>MarkerFileExclusiveReadLockStrategy using a marker file for acquiring read lock.</li> * <li>FileChangedExclusiveReadLockStrategy using a file changed detection for acquiring read lock.</li> + * <li>FileIdempotentRepositoryReadLockStrategy using a {@link org.apache.camel.spi.IdempotentRepository} to hold the read locks which allows to support clustering.</li> * </ul> */ public interface GenericFileExclusiveReadLockStrategy<T> { @@ -57,14 +58,34 @@ public interface GenericFileExclusiveReadLockStrategy<T> { boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception; /** - * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method. + * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due an abort operation (acquireExclusiveReadLock returned false). * * @param operations generic file operations * @param file the file * @param exchange the exchange * @throws Exception can be thrown in case of errors */ - void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception; + void releaseExclusiveReadLockOnAbort(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception; + + /** + * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due a rollback operation (Exchange processing failed) + * + * @param operations generic file operations + * @param file the file + * @param exchange the exchange + * @throws Exception can be thrown in case of errors + */ + void releaseExclusiveReadLockOnRollback(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception; + + /** + * Releases the exclusive read lock granted by the <tt>acquireExclusiveReadLock</tt> method due a commit operation (Exchange processing succeeded) + * + * @param operations generic file operations + * @param file the file + * @param exchange the exchange + * @throws Exception can be thrown in case of errors + */ + void releaseExclusiveReadLockOnCommit(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception; /** * Sets an optional timeout period. http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java new file mode 100644 index 0000000..8e043b8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.strategy; + +import java.io.File; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileEndpoint; +import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.CamelLogger; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file read lock that uses an {@link org.apache.camel.spi.IdempotentRepository} as the lock strategy. This allows to plugin and use existing + * idempotent repositories that for example supports clustering. The other read lock strategies that are using marker files or file locks, + * are not guaranteed to work in clustered setup with various platform and file systems. + */ +public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware { + + private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class); + + private LoggingLevel loggingLevel = LoggingLevel.TRACE; + private CamelContext camelContext; + private IdempotentRepository<String> idempotentRepository; + private boolean removeOnRollback = true; + + @Override + public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { + LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); + } + + @Override + public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // check if we can begin on this file + String key = asKey(file); + boolean answer = idempotentRepository.add(key); + CamelLogger.log(LOG, loggingLevel, "acquireExclusiveReadLock: " + key + " -> " + answer); + return answer; + } + + @Override + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnAbort: " + key); + } + + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + if (removeOnRollback) { + String key = asKey(file); + CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnRollback: " + key); + idempotentRepository.remove(key); + } + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + String key = asKey(file); + CamelLogger.log(LOG, loggingLevel, "releaseExclusiveReadLockOnCommit: " + key); + idempotentRepository.contains(key); + } + + public void setTimeout(long timeout) { + // noop + } + + public void setCheckInterval(long checkInterval) { + // noop + } + + public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { + this.loggingLevel = readLockLoggingLevel; + } + + public void setMarkerFiler(boolean markerFile) { + // noop + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public IdempotentRepository<String> getIdempotentRepository() { + return idempotentRepository; + } + + /** + * The idempotent repository to use as the store for the read locks. + */ + public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { + this.idempotentRepository = idempotentRepository; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public boolean isRemoveOnRollback() { + return removeOnRollback; + } + + /** + * Whether to remove the file from the idempotent repository when doing a rollback. + * <p/> + * By default this is true. + */ + public void setRemoveOnRollback(boolean removeOnRollback) { + this.removeOnRollback = removeOnRollback; + } + + protected String asKey(GenericFile<File> file) { + return file.getAbsoluteFilePath(); + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext", this); + ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); + + // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself + camelContext.addService(idempotentRepository, true); + } + + @Override + protected void doStop() throws Exception { + // noop + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java index ae5b0ff..8fd94f5 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java @@ -123,7 +123,7 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + target, LOG); // and also must release super lock - super.releaseExclusiveReadLock(operations, file, exchange); + super.releaseExclusiveReadLockOnAbort(operations, file, exchange); } } @@ -135,11 +135,10 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo } @Override - public void releaseExclusiveReadLock(GenericFileOperations<File> operations, - GenericFile<File> file, Exchange exchange) throws Exception { - + protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations, + GenericFile<File> file, Exchange exchange) throws Exception { // must call super - super.releaseExclusiveReadLock(operations, file, exchange); + super.doReleaseExclusiveReadLock(operations, file, exchange); String target = file.getFileName(); FileLock lock = exchange.getProperty(Exchange.FILE_LOCK_EXCLUSIVE_LOCK, FileLock.class); http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index ed5bd4e..5a31374 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -24,6 +24,7 @@ import org.apache.camel.Expression; import org.apache.camel.LoggingLevel; import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; import org.apache.camel.component.file.GenericFileProcessStrategy; +import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.spi.Language; import org.apache.camel.util.ObjectHelper; @@ -127,6 +128,17 @@ public final class FileProcessStrategyFactory { readLockStrategy.setMinAge(minAge); } strategy = readLockStrategy; + } else if ("idempotent".equals(readLock)) { + FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + strategy = readLockStrategy; } if (strategy != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java index a2bbc49..f4e4ed7 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java @@ -49,18 +49,40 @@ public class FileRenameExclusiveReadLockStrategy extends GenericFileRenameExclus } @Override - public void releaseExclusiveReadLock(GenericFileOperations<File> operations, - GenericFile<File> file, Exchange exchange) throws Exception { + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { // must call marker first try { if (markerFile) { - marker.releaseExclusiveReadLock(operations, file, exchange); + marker.releaseExclusiveReadLockOnAbort(operations, file, exchange); } } finally { - super.releaseExclusiveReadLock(operations, file, exchange); + super.releaseExclusiveReadLockOnAbort(operations, file, exchange); } } + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // must call marker first + try { + if (markerFile) { + marker.releaseExclusiveReadLockOnRollback(operations, file, exchange); + } + } finally { + super.releaseExclusiveReadLockOnRollback(operations, file, exchange); + } + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + // must call marker first + try { + if (markerFile) { + marker.releaseExclusiveReadLockOnCommit(operations, file, exchange); + } + } finally { + super.releaseExclusiveReadLockOnCommit(operations, file, exchange); + } + } @Override public void setMarkerFiler(boolean markerFile) { http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java index 504593f..cbe125e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java @@ -62,7 +62,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil // must release lock last if (exclusiveReadLockStrategy != null) { - exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + exclusiveReadLockStrategy.releaseExclusiveReadLockOnAbort(operations, file, exchange); } } @@ -72,7 +72,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil // must release lock last if (exclusiveReadLockStrategy != null) { - exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + exclusiveReadLockStrategy.releaseExclusiveReadLockOnCommit(operations, file, exchange); } } @@ -82,7 +82,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil // must release lock last if (exclusiveReadLockStrategy != null) { - exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java index fe8343a..062c7ee 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameExclusiveReadLockStrategy.java @@ -88,8 +88,17 @@ public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFil } @Override - public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file, - Exchange exchange) throws Exception { + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception { + // noop + } + + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception { + // noop + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<T> operations, GenericFile<T> file, Exchange exchange) throws Exception { // noop } http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java index 7378392..9271b0f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java @@ -67,7 +67,7 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat } } finally { if (exclusiveReadLockStrategy != null) { - exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + exclusiveReadLockStrategy.releaseExclusiveReadLockOnRollback(operations, file, exchange); } deleteLocalWorkFile(exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/78ccf133/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java index 6deebff..9792df0 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java @@ -76,8 +76,22 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive } @Override - public void releaseExclusiveReadLock(GenericFileOperations<File> operations, - GenericFile<File> file, Exchange exchange) throws Exception { + public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + doReleaseExclusiveReadLock(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + doReleaseExclusiveReadLock(operations, file, exchange); + } + + @Override + public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { + doReleaseExclusiveReadLock(operations, file, exchange); + } + + protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations, + GenericFile<File> file, Exchange exchange) throws Exception { if (!markerFile) { // if not using marker file then nothing to release return;