This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 423bfc805cfc CAMEL-22784 - Improve FileLockClusterService resilience 
to long blocking network based file I/O
423bfc805cfc is described below

commit 423bfc805cfc4072370a8806033f67b9cbb111b6
Author: James Netherton <[email protected]>
AuthorDate: Mon Dec 22 13:31:27 2025 +0000

    CAMEL-22784 - Improve FileLockClusterService resilience to long blocking 
network based file I/O
---
 components/camel-file/pom.xml                      |   5 +
 .../file/cluster/FileLockClusterLeaderInfo.java    |   9 ++
 .../file/cluster/FileLockClusterService.java       |  92 +++++++++++++
 .../file/cluster/FileLockClusterTaskExecutor.java  |  75 +++++++++++
 .../file/cluster/FileLockClusterUtils.java         |   6 +-
 .../file/cluster/FileLockClusterView.java          | 147 +++++++++++++++------
 .../cluster/FileLockClusterTaskExecutorTest.java   | 113 ++++++++++++++++
 .../file/cluster/FileLockClusterUtilsTest.java     |   3 +-
 ...FileLockClusterServiceAdvancedFailoverTest.java |  88 ------------
 .../FileLockClusterServiceBasicFailoverTest.java   |  55 ++++++--
 .../cluster/FileLockClusterServiceTestBase.java    |  24 +++-
 .../user-manual/modules/ROOT/pages/clustering.adoc |   3 +
 12 files changed, 480 insertions(+), 140 deletions(-)

diff --git a/components/camel-file/pom.xml b/components/camel-file/pom.xml
index c2b255ef0b2b..023f4648818e 100644
--- a/components/camel-file/pom.xml
+++ b/components/camel-file/pom.xml
@@ -53,6 +53,11 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 </project>
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
index 77f0ba55fe01..1a2c70387143 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
@@ -68,4 +68,13 @@ final class FileLockClusterLeaderInfo {
     public int hashCode() {
         return Objects.hashCode(id);
     }
+
+    @Override
+    public String toString() {
+        return "FileLockClusterLeaderInfo{" +
+               "id='" + id + '\'' +
+               ", heartbeatUpdateIntervalMilliseconds=" + 
heartbeatUpdateIntervalMilliseconds +
+               ", heartbeatMilliseconds=" + heartbeatMilliseconds +
+               '}';
+    }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
index e21568925f0a..3851aea2cdd6 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.file.cluster;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -32,6 +33,10 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
     private TimeUnit acquireLockIntervalUnit;
     private ScheduledExecutorService executor;
     private int heartbeatTimeoutMultiplier;
+    private int clusterDataTaskMaxAttempts;
+    private long clusterDataTaskTimeout;
+    private TimeUnit clusterDataTaskTimeoutUnit;
+    private ExecutorService clusterDataTaskExecutor;
 
     public FileLockClusterService() {
         this.acquireLockDelay = 1;
@@ -39,6 +44,9 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
         this.acquireLockInterval = 10;
         this.acquireLockIntervalUnit = TimeUnit.SECONDS;
         this.heartbeatTimeoutMultiplier = 5;
+        this.clusterDataTaskMaxAttempts = 5;
+        this.clusterDataTaskTimeout = 10;
+        this.clusterDataTaskTimeoutUnit = TimeUnit.SECONDS;
     }
 
     @Override
@@ -120,6 +128,9 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
      * <p>
      */
     public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
+        if (heartbeatTimeoutMultiplier <= 0) {
+            throw new IllegalArgumentException("HeartbeatTimeoutMultiplier 
must be greater than 0");
+        }
         this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
     }
 
@@ -127,6 +138,64 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
         return heartbeatTimeoutMultiplier;
     }
 
+    /**
+     * Sets how many times a cluster data task will run, counting both the 
first execution and subsequent retries in
+     * case of failure or timeout. The default is 5 attempts.
+     * <p>
+     * This can be useful when the cluster data root is on network based file 
storage, where I/O operations may
+     * occasionally block for long or unpredictable periods.
+     */
+    public void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) {
+        if (clusterDataTaskMaxAttempts <= 0) {
+            throw new IllegalArgumentException("clusterDataTaskMaxRetries must 
be greater than 0");
+        }
+        this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts;
+    }
+
+    public int getClusterDataTaskMaxAttempts() {
+        return clusterDataTaskMaxAttempts;
+    }
+
+    /**
+     * Sets the timeout for a cluster data task (reading or writing cluster 
data). The default is 10 seconds.
+     * <p>
+     * Timeouts are useful when the cluster data root is on network storage, 
where I/O operations may occasionally block
+     * for long or unpredictable periods.
+     */
+    public void setClusterDataTaskTimeout(long clusterDataTaskTimeout) {
+        if (clusterDataTaskTimeout <= 0) {
+            throw new IllegalArgumentException("clusterDataTaskMaxRetries must 
be greater than 0");
+        }
+        this.clusterDataTaskTimeout = clusterDataTaskTimeout;
+    }
+
+    public long getClusterDataTaskTimeout() {
+        return clusterDataTaskTimeout;
+    }
+
+    /**
+     * The time unit for the clusterDataTaskTimeoutUnit, default to 
TimeUnit.SECONDS.
+     */
+    public void setClusterDataTaskTimeoutUnit(TimeUnit 
clusterDataTaskTimeoutUnit) {
+        this.clusterDataTaskTimeoutUnit = clusterDataTaskTimeoutUnit;
+    }
+
+    public TimeUnit getClusterDataTaskTimeoutUnit() {
+        return clusterDataTaskTimeoutUnit;
+    }
+
+    /**
+     * Sets the timeout for a cluster data task (reading or writing cluster 
data). The default is 10 seconds.
+     * <p>
+     * Timeouts are useful when the cluster data root is on network storage, 
where I/O operations may occasionally block
+     * for long or unpredictable periods.
+     * <p>
+     */
+    public void setClusterDataTaskTimeout(long clusterDataTaskTimeout, 
TimeUnit clusterDataTaskTimeoutUnit) {
+        setClusterDataTaskTimeout(clusterDataTaskTimeout);
+        setClusterDataTaskTimeoutUnit(clusterDataTaskTimeoutUnit);
+    }
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -142,6 +211,14 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
 
             executor = null;
         }
+
+        if (clusterDataTaskExecutor != null) {
+            if (context != null) {
+                
context.getExecutorServiceManager().shutdown(clusterDataTaskExecutor);
+            } else {
+                clusterDataTaskExecutor.shutdown();
+            }
+        }
     }
 
     ScheduledExecutorService getExecutor() {
@@ -161,4 +238,19 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
             internalLock.unlock();
         }
     }
+
+    ExecutorService getClusterDataTaskExecutor() {
+        Lock internalLock = getInternalLock();
+        internalLock.lock();
+        try {
+            if (clusterDataTaskExecutor == null) {
+                final CamelContext context = 
ObjectHelper.notNull(getCamelContext(), "CamelContext");
+                clusterDataTaskExecutor = 
context.getExecutorServiceManager().newFixedThreadPool(this,
+                        "FileLockClusterDataTask-" + getId(), 5);
+            }
+            return clusterDataTaskExecutor;
+        } finally {
+            internalLock.unlock();
+        }
+    }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
new file mode 100644
index 000000000000..cc7932989793
--- /dev/null
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes cluster data read / write tasks asynchronously, with timeouts to 
guard against potential unpredictable
+ * blocking I/O periods.
+ */
+class FileLockClusterTaskExecutor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileLockClusterTaskExecutor.class);
+    private final FileLockClusterService service;
+
+    FileLockClusterTaskExecutor(FileLockClusterService service) {
+        Objects.requireNonNull(service, "FileLockClusterService cannot be 
null");
+        this.service = service;
+    }
+
+    /**
+     * If the cluster data root is network based, like an NFS mount, avoid 
potential long blocking I/O to fail fast and
+     * reliably reason about the cluster state.
+     *
+     * @param task Supplier representing a task to run
+     */
+    <T> T run(Supplier<T> task) throws ExecutionException, TimeoutException {
+        Objects.requireNonNull(task, "Task cannot be null");
+
+        int maxAttempts = service.getClusterDataTaskMaxAttempts();
+        for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+            LOGGER.debug("Running cluster task attempt {} of {}", attempt, 
maxAttempts);
+
+            CompletableFuture<T> future = CompletableFuture.supplyAsync(task, 
service.getClusterDataTaskExecutor());
+            try {
+                return future.get(service.getClusterDataTaskTimeout(), 
service.getClusterDataTaskTimeoutUnit());
+            } catch (InterruptedException e) {
+                LOGGER.trace("Cluster task interrupted on attempt {} of {}", 
attempt, maxAttempts);
+                future.cancel(true);
+                Thread.currentThread().interrupt();
+                return null;
+            } catch (ExecutionException | TimeoutException e) {
+                LOGGER.debug("Cluster task encountered an exception on attempt 
{} of {}", attempt, maxAttempts, e);
+                future.cancel(true);
+                if (attempt == maxAttempts) {
+                    LOGGER.debug("Cluster task retry limit ({}) reached", 
maxAttempts, e);
+                    throw e;
+                }
+            } finally {
+                LOGGER.debug("Cluster task attempt {} ended", attempt);
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
index fd3a7982cf9f..5a7466ceb622 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
@@ -62,7 +62,7 @@ final class FileLockClusterUtils {
             FileChannel channel,
             FileLockClusterLeaderInfo clusterLeaderInfo,
             boolean forceMetaData)
-            throws IOException {
+            throws Exception {
 
         Objects.requireNonNull(channel, "channel cannot be null");
         Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be 
null");
@@ -100,7 +100,7 @@ final class FileLockClusterUtils {
      *                        inconsistent state
      * @throws IOException    If reading the lock file failed
      */
-    static FileLockClusterLeaderInfo readClusterLeaderInfo(Path 
leaderDataPath) throws IOException {
+    static FileLockClusterLeaderInfo readClusterLeaderInfo(Path 
leaderDataPath) throws Exception {
         try {
             byte[] bytes = Files.readAllBytes(leaderDataPath);
 
@@ -119,7 +119,7 @@ final class FileLockClusterUtils {
             long lastHeartbeat = buf.getLong();
 
             return new FileLockClusterLeaderInfo(uuidStr, intervalMillis, 
lastHeartbeat);
-        } catch (NoSuchFileException e) {
+        } catch (FileNotFoundException | NoSuchFileException e) {
             // Handle NoSuchFileException to give the ClusterView a chance to 
recreate the leadership data
             return null;
         }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index 8156f00cfaff..80896e1f1c32 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -29,14 +29,18 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.apache.camel.util.function.ThrowingHelper;
+import org.apache.camel.util.function.ThrowingSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +61,7 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
     private ScheduledFuture<?> task;
     private int heartbeatTimeoutMultiplier;
     private long acquireLockIntervalMilliseconds;
+    private FileLockClusterTaskExecutor clusterTaskExecutor;
 
     FileLockClusterView(FileLockClusterService cluster, String namespace) {
         super(cluster, namespace);
@@ -88,6 +93,8 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
 
     @Override
     protected void doStart() throws Exception {
+        FileLockClusterService service = 
getClusterService().unwrap(FileLockClusterService.class);
+
         // Start critical section
         try {
             contextStartLock.lock();
@@ -97,31 +104,42 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                 fireLeadershipChangedEvent((CamelClusterMember) null);
             }
 
-            if (!Files.exists(leaderLockPath.getParent())) {
-                Files.createDirectories(leaderLockPath.getParent());
+            // Attempt to pre-create cluster data files & directories. On 
failure, it will either be attempted by another cluster member or run again 
within the tryLock task loop
+            try {
+                if (!Files.exists(leaderLockPath.getParent())) {
+                    Files.createDirectories(leaderLockPath.getParent());
+                }
+            } catch (IOException e) {
+                LOGGER.debug("Error creating directory {}", 
leaderLockPath.getParent(), e);
             }
 
-            if (!Files.exists(leaderLockPath)) {
-                Files.createFile(leaderLockPath);
+            try {
+                if (!Files.exists(leaderLockPath)) {
+                    Files.createFile(leaderLockPath);
+                }
+            } catch (IOException e) {
+                LOGGER.debug("Error creating cluster leader lock file {}", 
leaderLockPath, e);
             }
 
-            if (!Files.exists(leaderDataPath)) {
-                Files.createFile(leaderDataPath);
+            try {
+                if (!Files.exists(leaderDataPath)) {
+                    Files.createFile(leaderDataPath);
+                }
+            } catch (IOException e) {
+                LOGGER.debug("Error creating cluster leader data file {}", 
leaderDataPath, e);
             }
         } finally {
             // End critical section
             contextStartLock.unlock();
         }
 
-        FileLockClusterService service = 
getClusterService().unwrap(FileLockClusterService.class);
+        clusterTaskExecutor = new FileLockClusterTaskExecutor(service);
+
         acquireLockIntervalMilliseconds = TimeUnit.MILLISECONDS.convert(
                 service.getAcquireLockInterval(),
                 service.getAcquireLockIntervalUnit());
 
         heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier();
-        if (heartbeatTimeoutMultiplier <= 0) {
-            throw new IllegalArgumentException("HeartbeatTimeoutMultiplier 
must be greater than 0");
-        }
 
         ScheduledExecutorService executor = service.getExecutor();
         task = executor.scheduleWithFixedDelay(this::tryLock,
@@ -135,14 +153,20 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
     @Override
     protected void doStop() throws Exception {
         if (localMember.isLeader() && leaderDataFile != null) {
-            try {
-                FileChannel channel = leaderDataFile.getChannel();
-                channel.truncate(0);
-                channel.force(true);
-            } catch (Exception e) {
-                // Log and ignore since we need to release the file lock and 
do cleanup
-                LOGGER.debug("Failed to truncate {} on {} stop", 
leaderDataPath, getClass().getSimpleName(), e);
-            }
+            clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<Void, Throwable>() {
+                @Override
+                public Void get() throws Throwable {
+                    try {
+                        FileChannel channel = leaderDataFile.getChannel();
+                        channel.truncate(0);
+                        channel.force(true);
+                    } catch (Exception e) {
+                        // Log and ignore since we need to release the file 
lock and do cleanup
+                        LOGGER.debug("Failed to truncate {} on {} stop", 
leaderDataPath, getClass().getSimpleName(), e);
+                    }
+                    return null;
+                }
+            }));
         }
 
         closeInternal();
@@ -201,7 +225,7 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                         // Update the cluster data file with the leader state 
so that other cluster members can interrogate it
                         writeClusterLeaderInfo(false);
                         return;
-                    } catch (IOException e) {
+                    } catch (Exception e) {
                         LOGGER.debug("Failed writing cluster leader data to 
{}", leaderDataPath, e);
                     }
                 }
@@ -212,6 +236,7 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                             localMember.getUuid());
                     localMember.setStatus(ClusterMemberStatus.FOLLOWER);
                     fireLeadershipChangedEvent((CamelClusterMember) null);
+                    clusterLeaderInfoRef.set(null);
                     releaseFileLock();
                     closeLockFiles();
                     lock = null;
@@ -223,9 +248,8 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
 
                 // Get & update cluster leader state
                 LOGGER.debug("Reading cluster leader state from {}", 
leaderDataPath);
-                FileLockClusterLeaderInfo latestClusterLeaderInfo = 
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+                FileLockClusterLeaderInfo latestClusterLeaderInfo = 
readClusterLeaderInfo();
                 FileLockClusterLeaderInfo previousClusterLeaderInfo = 
clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo);
-
                 // Check if we can attempt to take cluster leadership
                 if (isLeaderStale(latestClusterLeaderInfo, 
previousClusterLeaderInfo)
                         || canReclaimLeadership(latestClusterLeaderInfo)) {
@@ -235,19 +259,17 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                     }
 
                     // Try to recreate the cluster data directory in case it 
got removed
-                    if (!Files.exists(leaderLockPath.getParent())) {
-                        Files.createDirectories(leaderLockPath.getParent());
-                    }
+                    createClusterRootDirectoryIfRequired();
 
                     // Attempt to obtain cluster leadership
                     LOGGER.debug("Try to acquire a lock on {} 
(cluster-member-id={})", leaderLockPath, localMember.getUuid());
-                    leaderLockFile = new 
RandomAccessFile(leaderLockPath.toFile(), "rw");
-                    leaderDataFile = new 
RandomAccessFile(leaderDataPath.toFile(), "rw");
+                    leaderLockFile = createRandomAccessFile(leaderLockPath);
+                    leaderDataFile = createRandomAccessFile(leaderDataPath);
 
                     lock = null;
                     lock = leaderLockFile.getChannel().tryLock(0, Math.max(1, 
leaderLockFile.getChannel().size()), false);
 
-                    if (lock != null) {
+                    if (lockIsValid()) {
                         LOGGER.info("Lock on file {} acquired (lock={}, 
cluster-member-id={})", leaderLockPath, lock,
                                 localMember.getUuid());
                         localMember.setStatus(ClusterMemberStatus.LEADER);
@@ -286,28 +308,67 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
         return leaderInfo != null && 
localMember.getUuid().equals(leaderInfo.getId());
     }
 
-    void writeClusterLeaderInfo(boolean forceMetaData) throws IOException {
+    void createClusterRootDirectoryIfRequired() throws ExecutionException, 
TimeoutException {
+        clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<Void, Throwable>() {
+            @Override
+            public Void get() throws Throwable {
+                if (!Files.exists(leaderLockPath.getParent())) {
+                    Files.createDirectories(leaderLockPath.getParent());
+                }
+                return null;
+            }
+        }));
+    }
+
+    RandomAccessFile createRandomAccessFile(Path path) throws 
ExecutionException, TimeoutException {
+        return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<RandomAccessFile, Throwable>() {
+            @Override
+            public RandomAccessFile get() throws Throwable {
+                return new RandomAccessFile(path.toFile(), "rw");
+            }
+        }));
+    }
+
+    FileLockClusterLeaderInfo readClusterLeaderInfo() throws Exception {
+        return clusterTaskExecutor
+                .run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<FileLockClusterLeaderInfo, Throwable>() {
+                    @Override
+                    public FileLockClusterLeaderInfo get() throws Throwable {
+                        return 
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+                    }
+                }));
+    }
+
+    void writeClusterLeaderInfo(boolean forceMetaData) throws Exception {
         FileLockClusterLeaderInfo latestClusterLeaderInfo = new 
FileLockClusterLeaderInfo(
                 localMember.getUuid(),
                 acquireLockIntervalMilliseconds,
                 System.currentTimeMillis());
 
-        FileLockClusterUtils.writeClusterLeaderInfo(
-                leaderDataPath,
-                leaderDataFile.getChannel(),
-                latestClusterLeaderInfo,
-                forceMetaData);
+        clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<Void, Throwable>() {
+            @Override
+            public Void get() throws Throwable {
+                FileLockClusterUtils.writeClusterLeaderInfo(
+                        leaderDataPath,
+                        leaderDataFile.getChannel(),
+                        latestClusterLeaderInfo,
+                        forceMetaData);
+                return null;
+            }
+        }));
     }
 
     boolean isLeaderInternal() {
         if (localMember.isLeader()) {
             try {
-                FileLockClusterLeaderInfo leaderInfo = 
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+                FileLockClusterLeaderInfo leaderInfo = readClusterLeaderInfo();
+                boolean leaderStale = isLeaderStale(leaderInfo, 
clusterLeaderInfoRef.getAndSet(leaderInfo));
+                LOGGER.debug("Leader read cluster data {}, isStale={}", 
leaderInfo, leaderStale);
+
                 return leaderInfo != null
-                        && lock != null
-                        && lock.isValid()
+                        && !leaderStale
                         && localMember.getUuid().equals(leaderInfo.getId())
-                        && Files.exists(leaderLockPath);
+                        && lockIsValid();
             } catch (Exception e) {
                 LOGGER.debug("Failed to read {} (cluster-member-id={})", 
leaderLockPath, localMember.getUuid(), e);
                 return false;
@@ -316,6 +377,18 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
         return false;
     }
 
+    boolean lockIsValid() throws ExecutionException, TimeoutException {
+        if (lock != null && lock.isValid()) {
+            return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new 
ThrowingSupplier<Boolean, Throwable>() {
+                @Override
+                public Boolean get() throws Throwable {
+                    return Files.exists(leaderLockPath);
+                }
+            }));
+        }
+        return false;
+    }
+
     private final class ClusterMember implements CamelClusterMember {
         private final AtomicReference<ClusterMemberStatus> status = new 
AtomicReference<>(ClusterMemberStatus.STOPPED);
         private final String uuid = UUID.randomUUID().toString();
diff --git 
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
 
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
new file mode 100644
index 000000000000..0b086b3ee1a3
--- /dev/null
+++ 
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class FileLockClusterTaskExecutorTest {
+    @Test
+    void runTaskWithDefaultMaxAttemptsAndTimeout() throws ExecutionException, 
TimeoutException {
+        FileLockClusterService service = new FileLockClusterService();
+        service.setCamelContext(new DefaultCamelContext());
+
+        FileLockClusterTaskExecutor executor = new 
FileLockClusterTaskExecutor(service);
+
+        String message = "Hello World";
+        String result = executor.run(new Supplier<String>() {
+            @Override
+            public String get() {
+                return message;
+            }
+        });
+
+        Assertions.assertEquals(message, result);
+    }
+
+    @Test
+    void runTaskWithMaxAttemptsExceeded() {
+        int maxAttempts = 3;
+        int timeoutMs = 100;
+
+        FileLockClusterService service = new FileLockClusterService();
+        service.setCamelContext(new DefaultCamelContext());
+        service.setClusterDataTaskMaxAttempts(maxAttempts);
+        service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+
+        FileLockClusterTaskExecutor executor = new 
FileLockClusterTaskExecutor(service);
+
+        AtomicInteger count = new AtomicInteger();
+        String message = "Hello World";
+
+        Assertions.assertThrows(TimeoutException.class, () -> {
+            executor.run(new Supplier<String>() {
+                @Override
+                public String get() {
+                    count.incrementAndGet();
+                    try {
+                        Thread.sleep(timeoutMs + 50);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    return message;
+                }
+            });
+        });
+
+        Assertions.assertEquals(3, count.get());
+    }
+
+    @Test
+    void runTaskWithMaxAttemptsNotExceeded() throws ExecutionException, 
TimeoutException {
+        int maxAttempts = 3;
+        int timeoutMs = 100;
+
+        FileLockClusterService service = new FileLockClusterService();
+        service.setCamelContext(new DefaultCamelContext());
+        service.setClusterDataTaskMaxAttempts(maxAttempts);
+        service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+
+        FileLockClusterTaskExecutor executor = new 
FileLockClusterTaskExecutor(service);
+
+        AtomicInteger count = new AtomicInteger();
+        String message = "Hello World";
+
+        String result = executor.run(new Supplier<String>() {
+            @Override
+            public String get() {
+                if (count.incrementAndGet() < 3) {
+                    try {
+                        Thread.sleep(timeoutMs + 50);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                return message;
+            }
+        });
+
+        Assertions.assertEquals(3, count.get());
+        Assertions.assertEquals(message, result);
+    }
+}
diff --git 
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
 
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
index 3e6f9db8a578..28fb79fc2923 100644
--- 
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
+++ 
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.file.cluster;
 
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -171,7 +170,7 @@ class FileLockClusterUtilsTest {
     }
 
     @Test
-    void writeClusterLeaderInfoData(@TempDir Path tempDir) throws IOException {
+    void writeClusterLeaderInfoData(@TempDir Path tempDir) throws Exception {
         Path clusterData = tempDir.resolve("leader.dat");
         try (RandomAccessFile raf = new RandomAccessFile(clusterData.toFile(), 
"rw")) {
             FileLockClusterLeaderInfo leaderInfo = new 
FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 2L);
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
index c335681eca85..29c9a60536bf 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -258,94 +258,6 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
         }
     }
 
-    @Test
-    void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir 
Path clusterMovedLocation) throws Exception {
-        ClusterConfig leaderConfig = new ClusterConfig();
-        leaderConfig.setTimerRepeatCount(-1);
-
-        CamelContext clusterLeader = createCamelContext(leaderConfig);
-
-        ClusterConfig followerConfig = new ClusterConfig();
-        followerConfig.setTimerRepeatCount(-1);
-        followerConfig.setAcquireLockDelay(2);
-
-        CamelContext clusterFollower = createCamelContext(followerConfig);
-
-        try {
-            MockEndpoint mockEndpointLeader = 
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
-            mockEndpointLeader.expectedMessageCount(5);
-
-            clusterLeader.start();
-            clusterFollower.start();
-
-            mockEndpointLeader.assertIsSatisfied();
-
-            AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new 
AtomicReference<>();
-            Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() 
-> {
-                assertTrue(Files.exists(lockFile));
-                assertTrue(Files.exists(dataFile));
-                assertTrue(getClusterMember(clusterLeader).isLeader());
-
-                FileLockClusterLeaderInfo clusterLeaderInfo = 
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
-                assertNotNull(clusterLeaderInfo);
-                leaderInfo.set(clusterLeaderInfo);
-
-                String leaderId = clusterLeaderInfo.getId();
-                assertNotNull(leaderId);
-                assertDoesNotThrow(() -> UUID.fromString(leaderId));
-            });
-
-            // Wait enough time for the follower to have run its lock 
acquisition scheduled task
-            Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
-
-            // The follower should not have produced any messages
-            MockEndpoint mockEndpointFollower = 
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
-            assertTrue(mockEndpointFollower.getExchanges().isEmpty());
-
-            mockEndpointLeader.reset();
-            mockEndpointLeader.expectedMinimumMessageCount(1);
-
-            // Simulate the file system becoming detached by moving the 
cluster data directory
-            Files.move(clusterDir, clusterMovedLocation, 
StandardCopyOption.REPLACE_EXISTING);
-
-            // Simulate reattaching the file system by moving the cluster 
directory back to the original location
-            try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
-                stream.forEach(path -> {
-                    try {
-                        Path destination = 
clusterDir.resolve(clusterMovedLocation.relativize(path));
-                        if (Files.isDirectory(path)) {
-                            Files.createDirectories(destination);
-                        } else {
-                            Files.copy(path, destination, 
StandardCopyOption.REPLACE_EXISTING);
-                        }
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                });
-            }
-
-            FileLockClusterLeaderInfo updatedInfo
-                    = new FileLockClusterLeaderInfo(
-                            leaderInfo.get().getId(), 
TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis());
-            Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
-            try (RandomAccessFile file = new RandomAccessFile(data.toFile(), 
"rw")) {
-                FileLockClusterUtils.writeClusterLeaderInfo(data, 
file.getChannel(), updatedInfo,
-                        true);
-            }
-
-            // Since the lock file is not considered 'stale', the original 
leader should resume leadership
-            Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() 
-> {
-                assertTrue(getClusterMember(clusterLeader).isLeader());
-                mockEndpointLeader.assertIsSatisfied();
-            });
-
-            assertTrue(mockEndpointFollower.getExchanges().isEmpty());
-        } finally {
-            clusterLeader.stop();
-            clusterFollower.stop();
-        }
-    }
-
     @Test
     void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path 
clusterMovedLocation) throws Exception {
         ClusterConfig leaderConfig = new ClusterConfig();
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
index 72b49bc1b486..5bb4da929715 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -221,28 +220,68 @@ class FileLockClusterServiceBasicFailoverTest extends 
FileLockClusterServiceTest
     }
 
     @Test
-    void negativeHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+    void negativeHeartbeatTimeoutMultiplierThrowsException() {
         ClusterConfig config = new ClusterConfig();
         config.setHeartbeatTimeoutMultiplier(-1);
-
-        Exception exception = assertThrows(Exception.class, () -> {
+        assertThrows(IllegalArgumentException.class, () -> {
             try (CamelContext camelContext = createCamelContext(config)) {
                 camelContext.start();
             }
         });
-        assertIsInstanceOf(IllegalArgumentException.class, 
exception.getCause());
     }
 
     @Test
-    void zeroHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+    void zeroHeartbeatTimeoutMultiplierThrowsException() {
         ClusterConfig config = new ClusterConfig();
         config.setHeartbeatTimeoutMultiplier(0);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void negativeClusterDataTaskMaxAttemptsThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setClusterDataTaskMaxAttempts(-1);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void zeroClusterDataTaskMaxAttemptsThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setClusterDataTaskMaxAttempts(0);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
+
+    @Test
+    void negativeClusterDataTaskTimeoutThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setClusterDataTaskTimeout(-1);
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (CamelContext camelContext = createCamelContext(config)) {
+                camelContext.start();
+            }
+        });
+    }
 
-        Exception exception = assertThrows(Exception.class, () -> {
+    @Test
+    void zeroClusterDataTaskTimeoutThrowsException() {
+        ClusterConfig config = new ClusterConfig();
+        config.setClusterDataTaskTimeout(0);
+        assertThrows(IllegalArgumentException.class, () -> {
             try (CamelContext camelContext = createCamelContext(config)) {
                 camelContext.start();
             }
         });
-        assertIsInstanceOf(IllegalArgumentException.class, 
exception.getCause());
     }
 }
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
index 72290aaacbbb..b7f7bfbbde55 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
@@ -67,6 +67,8 @@ abstract class FileLockClusterServiceTestBase {
         service.setAcquireLockInterval(1);
         service.setRoot(clusterDir.toString());
         
service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier());
+        
service.setClusterDataTaskMaxAttempts(config.getClusterDataTaskMaxAttempts());
+        service.setClusterDataTaskTimeout(config.getClusterDataTaskTimeout());
         return service;
     }
 
@@ -83,6 +85,8 @@ abstract class FileLockClusterServiceTestBase {
         private long acquireLockDelay = 1;
         private long timerRepeatCount = 5;
         private int heartbeatTimeoutMultiplier = 5;
+        private int clusterDataTaskMaxAttempts = 5;
+        private long clusterDataTaskTimeout = 5;
 
         long getAcquireLockDelay() {
             return acquireLockDelay;
@@ -104,12 +108,28 @@ abstract class FileLockClusterServiceTestBase {
             return TimeUnit.SECONDS.toMillis(getAcquireLockDelay()) + 500;
         }
 
-        public int getHeartbeatTimeoutMultiplier() {
+        int getHeartbeatTimeoutMultiplier() {
             return heartbeatTimeoutMultiplier;
         }
 
-        public void setHeartbeatTimeoutMultiplier(int 
heartbeatTimeoutMultiplier) {
+        void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
             this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
         }
+
+        int getClusterDataTaskMaxAttempts() {
+            return this.clusterDataTaskMaxAttempts;
+        }
+
+        void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) {
+            this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts;
+        }
+
+        long getClusterDataTaskTimeout() {
+            return clusterDataTaskTimeout;
+        }
+
+        void setClusterDataTaskTimeout(long clusterDataTaskTimeout) {
+            this.clusterDataTaskTimeout = clusterDataTaskTimeout;
+        }
     }
 }
diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc 
b/docs/user-manual/modules/ROOT/pages/clustering.adoc
index 669a56d13216..322443e064c1 100644
--- a/docs/user-manual/modules/ROOT/pages/clustering.adoc
+++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc
@@ -65,6 +65,9 @@ Configuration options:
 | acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | 
TimeUnit
 | acquireLockInterval | The time to wait between attempts to try to acquire 
the cluster lock | 10 | long
 | acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | 
TimeUnit
+| clusterDataTaskMaxAttempts | Sets how many times a cluster data task will 
run, counting both the first execution and subsequent retries in case of 
failure or timeout. This can be useful when the cluster data root is on network 
based file storage, where I/O operations may occasionally block for long or 
unpredictable periods | 5 | int
+| clusterDataTaskTimeout | Sets the timeout for a cluster data task (reading 
or writing cluster data). Timeouts are useful when the cluster data root is on 
network storage, where I/O operations may occasionally block for long or 
unpredictable periods | 10 | long
+| clusterDataTaskTimeoutUnit | The time unit for the 
clusterDataTaskTimeoutUnit | SECONDS | TimeUnit
 | heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader 
acquireLockInterval to determine how long followers should wait before 
considering the leader "stale". For example, if the leader updates its 
heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers 
will tolerate up to code 2s * 3 = 6s of silence before declaring the leader 
unavailable | 5 | int
 | rootPath | The file cluster root directory path | | String
 |===


Reply via email to