CAMEL-6413: Fixed race condition in file consumer, as read lock release should be executed last.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/afb28228 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/afb28228 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/afb28228 Branch: refs/heads/camel-2.11.x Commit: afb28228adb55e27465906e3e4c7697b643fe4b6 Parents: ea2aee8 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jun 2 11:42:08 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jun 2 12:29:34 2013 +0200 ---------------------------------------------------------------------- .../strategy/GenericFileDeleteProcessStrategy.java | 95 ++++++++------ .../GenericFileProcessStrategySupport.java | 19 ++-- .../strategy/GenericFileRenameProcessStrategy.java | 61 +++++---- 3 files changed, 99 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java index 26b3850..b2ccb8e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileDeleteProcessStrategy.java @@ -50,54 +50,67 @@ public class GenericFileDeleteProcessStrategy<T> extends GenericFileProcessStrat @Override public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { - // must invoke super - super.commit(operations, endpoint, exchange, file); - - int retries = 3; - boolean deleted = false; - - while (retries > 0 && !deleted) { - retries--; - - if (operations.deleteFile(file.getAbsoluteFilePath())) { - // file is deleted - deleted = true; - break; + try { + deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); + + int retries = 3; + boolean deleted = false; + + while (retries > 0 && !deleted) { + retries--; + + if (operations.deleteFile(file.getAbsoluteFilePath())) { + // file is deleted + deleted = true; + break; + } + + // some OS can report false when deleting but the file is still deleted + // use exists to check instead + boolean exits = operations.existsFile(file.getAbsoluteFilePath()); + if (!exits) { + deleted = true; + } else { + log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file); + // sleep a bit and try again + Thread.sleep(1000); + } } - - // some OS can report false when deleting but the file is still deleted - // use exists to check instead - boolean exits = operations.existsFile(file.getAbsoluteFilePath()); - if (!exits) { - deleted = true; - } else { - log.trace("File was not deleted at this attempt will try again in 1 sec.: {}", file); - // sleep a bit and try again - Thread.sleep(1000); + if (!deleted) { + throw new GenericFileOperationFailedException("Cannot delete file: " + file); + } + } finally { + // must release lock last + if (exclusiveReadLockStrategy != null) { + exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); } - } - - if (!deleted) { - throw new GenericFileOperationFailedException("Cannot delete file: " + file); } } @Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { - // must invoke super - super.rollback(operations, endpoint, exchange, file); - - // moved the failed file if specifying the moveFailed option - if (failureRenamer != null) { - // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name - Exchange copy = exchange.copy(); - file.bindToExchange(copy); - // must preserve message id - copy.getIn().setMessageId(exchange.getIn().getMessageId()); - copy.setExchangeId(exchange.getExchangeId()); - - GenericFile<T> newName = failureRenamer.renameFile(copy, file); - renameFile(operations, file, newName); + try { + deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); + + // moved the failed file if specifying the moveFailed option + if (failureRenamer != null) { + // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name + Exchange copy = exchange.copy(); + file.bindToExchange(copy); + // must preserve message id + copy.getIn().setMessageId(exchange.getIn().getMessageId()); + copy.setExchangeId(exchange.getExchangeId()); + + GenericFile<T> newName = failureRenamer.renameFile(copy, file); + renameFile(operations, file, newName); + } + } finally { + // must release lock last + if (exclusiveReadLockStrategy != null) { + exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + } } } http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java index 06254dc..b1ddcc2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java @@ -57,30 +57,33 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil } public void abort(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { + deleteLocalWorkFile(exchange); + operations.releaseRetreivedFileResources(exchange); + + // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); } + } + public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); - } - public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { + // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); } + } + public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { deleteLocalWorkFile(exchange); operations.releaseRetreivedFileResources(exchange); - } - public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { + // must release lock last if (exclusiveReadLockStrategy != null) { exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); } - - deleteLocalWorkFile(exchange); - operations.releaseRetreivedFileResources(exchange); } public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() { @@ -115,7 +118,7 @@ public abstract class GenericFileProcessStrategySupport<T> implements GenericFil return to; } - private void deleteLocalWorkFile(Exchange exchange) { + protected void deleteLocalWorkFile(Exchange exchange) { // delete local work file, if it was used (eg by ftp component) File local = exchange.getIn().getHeader(Exchange.FILE_LOCAL_WORK_PATH, File.class); if (local != null && local.exists()) { http://git-wip-us.apache.org/repos/asf/camel/blob/afb28228/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java index 106918a..7378392 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileRenameProcessStrategy.java @@ -31,7 +31,6 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat @Override public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { - // must invoke super boolean result = super.begin(operations, endpoint, exchange, file); if (!result) { @@ -52,37 +51,45 @@ public class GenericFileRenameProcessStrategy<T> extends GenericFileProcessStrat @Override public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { - // must invoke super - super.rollback(operations, endpoint, exchange, file); - - if (failureRenamer != null) { - // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name - Exchange copy = exchange.copy(); - file.bindToExchange(copy); - // must preserve message id - copy.getIn().setMessageId(exchange.getIn().getMessageId()); - copy.setExchangeId(exchange.getExchangeId()); - - GenericFile<T> newName = failureRenamer.renameFile(copy, file); - renameFile(operations, file, newName); + try { + operations.releaseRetreivedFileResources(exchange); + + if (failureRenamer != null) { + // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name + Exchange copy = exchange.copy(); + file.bindToExchange(copy); + // must preserve message id + copy.getIn().setMessageId(exchange.getIn().getMessageId()); + copy.setExchangeId(exchange.getExchangeId()); + + GenericFile<T> newName = failureRenamer.renameFile(copy, file); + renameFile(operations, file, newName); + } + } finally { + if (exclusiveReadLockStrategy != null) { + exclusiveReadLockStrategy.releaseExclusiveReadLock(operations, file, exchange); + } + deleteLocalWorkFile(exchange); } } @Override public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception { - // must invoke super - super.commit(operations, endpoint, exchange, file); - - if (commitRenamer != null) { - // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name - Exchange copy = exchange.copy(); - file.bindToExchange(copy); - // must preserve message id - copy.getIn().setMessageId(exchange.getIn().getMessageId()); - copy.setExchangeId(exchange.getExchangeId()); - - GenericFile<T> newName = commitRenamer.renameFile(copy, file); - renameFile(operations, file, newName); + try { + if (commitRenamer != null) { + // create a copy and bind the file to the exchange to be used by the renamer to evaluate the file name + Exchange copy = exchange.copy(); + file.bindToExchange(copy); + // must preserve message id + copy.getIn().setMessageId(exchange.getIn().getMessageId()); + copy.setExchangeId(exchange.getExchangeId()); + + GenericFile<T> newName = commitRenamer.renameFile(copy, file); + renameFile(operations, file, newName); + } + } finally { + // must invoke super + super.commit(operations, endpoint, exchange, file); } }