This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d016665e72c58839cf2b53df593b8895a0edcf2c
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Jun 24 09:25:27 2021 +0200

    CAMEL-16750: camel-file - file idempotent read-locks should be more roboust 
if acquring read-lock fails due to an exception such as when using a shared 
datbase and there is a duplicate key violation as another node has the lock.
---
 ...dempotentChangedRepositoryReadLockStrategy.java | 25 ++++++++++++++--------
 ...IdempotentRenameRepositoryReadLockStrategy.java | 11 ++++++++--
 .../FileIdempotentRepositoryReadLockStrategy.java  | 17 ++++++++++-----
 3 files changed, 37 insertions(+), 16 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
index 75003fe..09215d0 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
@@ -86,7 +86,14 @@ public class FileIdempotentChangedRepositoryReadLockStrategy 
extends ServiceSupp
 
         // check if we can begin on this file
         String key = asKey(file);
-        boolean answer = idempotentRepository.add(exchange, key);
+        boolean answer = false;
+        try {
+            answer = idempotentRepository.add(exchange, key);
+        } catch (Exception e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Cannot acquire read lock due to " + e.getMessage() 
+ ". Will skip the file: " + file, e);
+            }
+        }
         if (!answer) {
             // another node is processing the file so skip
             CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read 
lock. Will skip the file: " + file);
@@ -96,7 +103,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy 
extends ServiceSupp
             // if we acquired during idempotent then check changed also
             answer = changed.acquireExclusiveReadLock(operations, file, 
exchange);
             if (!answer) {
-                // remove from idempontent as we did not acquire it from 
changed
+                // remove from idempotent as we did not acquire it from changed
                 idempotentRepository.remove(exchange, key);
             }
         }
@@ -119,23 +126,23 @@ public class 
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
             if (removeOnRollback) {
                 idempotentRepository.remove(exchange, key);
             } else {
-                // okay we should not remove then confirm it instead
+                // okay we should not remove then confirm instead
                 idempotentRepository.confirm(exchange, key);
             }
 
             try {
                 changed.releaseExclusiveReadLockOnRollback(operations, file, 
exchange);
             } catch (Exception e) {
-                LOG.warn("Error during releasing exclusive readlock on 
rollback. This exception is ignored.", e);
+                LOG.warn("Error during releasing exclusive read lock on 
rollback. This exception is ignored.", e);
             }
         };
 
         if (readLockIdempotentReleaseDelay > 0 && 
readLockIdempotentReleaseExecutorService != null) {
-            LOG.debug("Scheduling readlock release task to run asynchronous 
delayed after {} millis",
+            LOG.debug("Scheduling read lock release task to run asynchronous 
delayed after {} millis",
                     readLockIdempotentReleaseDelay);
             readLockIdempotentReleaseExecutorService.schedule(r, 
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
         } else if (readLockIdempotentReleaseDelay > 0) {
-            LOG.debug("Delaying readlock release task {} millis", 
readLockIdempotentReleaseDelay);
+            LOG.debug("Delaying read lock release task {} millis", 
readLockIdempotentReleaseDelay);
             Thread.sleep(readLockIdempotentReleaseDelay);
             r.run();
         } else {
@@ -159,16 +166,16 @@ public class 
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
             try {
                 changed.releaseExclusiveReadLockOnCommit(operations, file, 
exchange);
             } catch (Exception e) {
-                LOG.warn("Error during releasing exclusive readlock on 
rollback. This exception is ignored.", e);
+                LOG.warn("Error during releasing exclusive read lock on 
rollback. This exception is ignored.", e);
             }
         };
 
         if (readLockIdempotentReleaseDelay > 0 && 
readLockIdempotentReleaseExecutorService != null) {
-            LOG.debug("Scheduling readlock release task to run asynchronous 
delayed after {} millis",
+            LOG.debug("Scheduling read lock release task to run asynchronous 
delayed after {} millis",
                     readLockIdempotentReleaseDelay);
             readLockIdempotentReleaseExecutorService.schedule(r, 
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
         } else if (readLockIdempotentReleaseDelay > 0) {
-            LOG.debug("Delaying readlock release task {} millis", 
readLockIdempotentReleaseDelay);
+            LOG.debug("Delaying read lock release task {} millis", 
readLockIdempotentReleaseDelay);
             Thread.sleep(readLockIdempotentReleaseDelay);
             r.run();
         } else {
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
index 6cf4cfa..82d653d 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
@@ -78,7 +78,14 @@ public class FileIdempotentRenameRepositoryReadLockStrategy 
extends ServiceSuppo
 
         // check if we can begin on this file
         String key = asKey(file);
-        boolean answer = idempotentRepository.add(exchange, key);
+        boolean answer = false;
+        try {
+            answer = idempotentRepository.add(exchange, key);
+        } catch (Exception e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Cannot acquire read lock due to " + e.getMessage() 
+ ". Will skip the file: " + file, e);
+            }
+        }
         if (!answer) {
             // another node is processing the file so skip
             CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read 
lock. Will skip the file: " + file);
@@ -88,7 +95,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy 
extends ServiceSuppo
             // if we acquired during idempotent then check rename also
             answer = rename.acquireExclusiveReadLock(operations, file, 
exchange);
             if (!answer) {
-                // remove from idempontent as we did not acquire it from 
changed
+                // remove from idempotent as we did not acquire it from changed
                 idempotentRepository.remove(exchange, key);
             }
         }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 5316f55..dacf232 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -75,7 +75,14 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
 
         // check if we can begin on this file
         String key = asKey(file);
-        boolean answer = idempotentRepository.add(exchange, key);
+        boolean answer = false;
+        try {
+            answer = idempotentRepository.add(exchange, key);
+        } catch (Exception e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Cannot acquire read lock due to " + e.getMessage() 
+ ". Will skip the file: " + file, e);
+            }
+        }
         if (!answer) {
             // another node is processing the file so skip
             CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read 
lock. Will skip the file: " + file);
@@ -105,11 +112,11 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
         };
 
         if (readLockIdempotentReleaseDelay > 0 && 
readLockIdempotentReleaseExecutorService != null) {
-            LOG.debug("Scheduling readlock release task to run asynchronous 
delayed after {} millis",
+            LOG.debug("Scheduling read lock release task to run asynchronous 
delayed after {} millis",
                     readLockIdempotentReleaseDelay);
             readLockIdempotentReleaseExecutorService.schedule(r, 
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
         } else if (readLockIdempotentReleaseDelay > 0) {
-            LOG.debug("Delaying readlock release task {} millis", 
readLockIdempotentReleaseDelay);
+            LOG.debug("Delaying read lock release task {} millis", 
readLockIdempotentReleaseDelay);
             Thread.sleep(readLockIdempotentReleaseDelay);
             r.run();
         } else {
@@ -132,11 +139,11 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
         };
 
         if (readLockIdempotentReleaseDelay > 0 && 
readLockIdempotentReleaseExecutorService != null) {
-            LOG.debug("Scheduling readlock release task to run asynchronous 
delayed after {} millis",
+            LOG.debug("Scheduling read lock release task to run asynchronous 
delayed after {} millis",
                     readLockIdempotentReleaseDelay);
             readLockIdempotentReleaseExecutorService.schedule(r, 
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
         } else if (readLockIdempotentReleaseDelay > 0) {
-            LOG.debug("Delaying readlock release task {} millis", 
readLockIdempotentReleaseDelay);
+            LOG.debug("Delaying read lock release task {} millis", 
readLockIdempotentReleaseDelay);
             Thread.sleep(readLockIdempotentReleaseDelay);
             r.run();
         } else {

Reply via email to