This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.11.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit d016665e72c58839cf2b53df593b8895a0edcf2c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jun 24 09:25:27 2021 +0200 CAMEL-16750: camel-file - file idempotent read-locks should be more roboust if acquring read-lock fails due to an exception such as when using a shared datbase and there is a duplicate key violation as another node has the lock. --- ...dempotentChangedRepositoryReadLockStrategy.java | 25 ++++++++++++++-------- ...IdempotentRenameRepositoryReadLockStrategy.java | 11 ++++++++-- .../FileIdempotentRepositoryReadLockStrategy.java | 17 ++++++++++----- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java index 75003fe..09215d0 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java @@ -86,7 +86,14 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp // check if we can begin on this file String key = asKey(file); - boolean answer = idempotentRepository.add(exchange, key); + boolean answer = false; + try { + answer = idempotentRepository.add(exchange, key); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cannot acquire read lock due to " + e.getMessage() + ". Will skip the file: " + file, e); + } + } if (!answer) { // another node is processing the file so skip CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); @@ -96,7 +103,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp // if we acquired during idempotent then check changed also answer = changed.acquireExclusiveReadLock(operations, file, exchange); if (!answer) { - // remove from idempontent as we did not acquire it from changed + // remove from idempotent as we did not acquire it from changed idempotentRepository.remove(exchange, key); } } @@ -119,23 +126,23 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp if (removeOnRollback) { idempotentRepository.remove(exchange, key); } else { - // okay we should not remove then confirm it instead + // okay we should not remove then confirm instead idempotentRepository.confirm(exchange, key); } try { changed.releaseExclusiveReadLockOnRollback(operations, file, exchange); } catch (Exception e) { - LOG.warn("Error during releasing exclusive readlock on rollback. This exception is ignored.", e); + LOG.warn("Error during releasing exclusive read lock on rollback. This exception is ignored.", e); } }; if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { - LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", + LOG.debug("Scheduling read lock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); } else if (readLockIdempotentReleaseDelay > 0) { - LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + LOG.debug("Delaying read lock release task {} millis", readLockIdempotentReleaseDelay); Thread.sleep(readLockIdempotentReleaseDelay); r.run(); } else { @@ -159,16 +166,16 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp try { changed.releaseExclusiveReadLockOnCommit(operations, file, exchange); } catch (Exception e) { - LOG.warn("Error during releasing exclusive readlock on rollback. This exception is ignored.", e); + LOG.warn("Error during releasing exclusive read lock on rollback. This exception is ignored.", e); } }; if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { - LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", + LOG.debug("Scheduling read lock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); } else if (readLockIdempotentReleaseDelay > 0) { - LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + LOG.debug("Delaying read lock release task {} millis", readLockIdempotentReleaseDelay); Thread.sleep(readLockIdempotentReleaseDelay); r.run(); } else { diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java index 6cf4cfa..82d653d 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java @@ -78,7 +78,14 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo // check if we can begin on this file String key = asKey(file); - boolean answer = idempotentRepository.add(exchange, key); + boolean answer = false; + try { + answer = idempotentRepository.add(exchange, key); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cannot acquire read lock due to " + e.getMessage() + ". Will skip the file: " + file, e); + } + } if (!answer) { // another node is processing the file so skip CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); @@ -88,7 +95,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo // if we acquired during idempotent then check rename also answer = rename.acquireExclusiveReadLock(operations, file, exchange); if (!answer) { - // remove from idempontent as we did not acquire it from changed + // remove from idempotent as we did not acquire it from changed idempotentRepository.remove(exchange, key); } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index 5316f55..dacf232 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -75,7 +75,14 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport // check if we can begin on this file String key = asKey(file); - boolean answer = idempotentRepository.add(exchange, key); + boolean answer = false; + try { + answer = idempotentRepository.add(exchange, key); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Cannot acquire read lock due to " + e.getMessage() + ". Will skip the file: " + file, e); + } + } if (!answer) { // another node is processing the file so skip CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); @@ -105,11 +112,11 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport }; if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { - LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", + LOG.debug("Scheduling read lock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); } else if (readLockIdempotentReleaseDelay > 0) { - LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + LOG.debug("Delaying read lock release task {} millis", readLockIdempotentReleaseDelay); Thread.sleep(readLockIdempotentReleaseDelay); r.run(); } else { @@ -132,11 +139,11 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport }; if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { - LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", + LOG.debug("Scheduling read lock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); } else if (readLockIdempotentReleaseDelay > 0) { - LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); + LOG.debug("Delaying read lock release task {} millis", readLockIdempotentReleaseDelay); Thread.sleep(readLockIdempotentReleaseDelay); r.run(); } else {