This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 423bfc805cfc CAMEL-22784 - Improve FileLockClusterService resilience
to long blocking network based file I/O
423bfc805cfc is described below
commit 423bfc805cfc4072370a8806033f67b9cbb111b6
Author: James Netherton <[email protected]>
AuthorDate: Mon Dec 22 13:31:27 2025 +0000
CAMEL-22784 - Improve FileLockClusterService resilience to long blocking
network based file I/O
---
components/camel-file/pom.xml | 5 +
.../file/cluster/FileLockClusterLeaderInfo.java | 9 ++
.../file/cluster/FileLockClusterService.java | 92 +++++++++++++
.../file/cluster/FileLockClusterTaskExecutor.java | 75 +++++++++++
.../file/cluster/FileLockClusterUtils.java | 6 +-
.../file/cluster/FileLockClusterView.java | 147 +++++++++++++++------
.../cluster/FileLockClusterTaskExecutorTest.java | 113 ++++++++++++++++
.../file/cluster/FileLockClusterUtilsTest.java | 3 +-
...FileLockClusterServiceAdvancedFailoverTest.java | 88 ------------
.../FileLockClusterServiceBasicFailoverTest.java | 55 ++++++--
.../cluster/FileLockClusterServiceTestBase.java | 24 +++-
.../user-manual/modules/ROOT/pages/clustering.adoc | 3 +
12 files changed, 480 insertions(+), 140 deletions(-)
diff --git a/components/camel-file/pom.xml b/components/camel-file/pom.xml
index c2b255ef0b2b..023f4648818e 100644
--- a/components/camel-file/pom.xml
+++ b/components/camel-file/pom.xml
@@ -53,6 +53,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
index 77f0ba55fe01..1a2c70387143 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java
@@ -68,4 +68,13 @@ final class FileLockClusterLeaderInfo {
public int hashCode() {
return Objects.hashCode(id);
}
+
+ @Override
+ public String toString() {
+ return "FileLockClusterLeaderInfo{" +
+ "id='" + id + '\'' +
+ ", heartbeatUpdateIntervalMilliseconds=" +
heartbeatUpdateIntervalMilliseconds +
+ ", heartbeatMilliseconds=" + heartbeatMilliseconds +
+ '}';
+ }
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
index e21568925f0a..3851aea2cdd6 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.file.cluster;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -32,6 +33,10 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
private TimeUnit acquireLockIntervalUnit;
private ScheduledExecutorService executor;
private int heartbeatTimeoutMultiplier;
+ private int clusterDataTaskMaxAttempts;
+ private long clusterDataTaskTimeout;
+ private TimeUnit clusterDataTaskTimeoutUnit;
+ private ExecutorService clusterDataTaskExecutor;
public FileLockClusterService() {
this.acquireLockDelay = 1;
@@ -39,6 +44,9 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
this.acquireLockInterval = 10;
this.acquireLockIntervalUnit = TimeUnit.SECONDS;
this.heartbeatTimeoutMultiplier = 5;
+ this.clusterDataTaskMaxAttempts = 5;
+ this.clusterDataTaskTimeout = 10;
+ this.clusterDataTaskTimeoutUnit = TimeUnit.SECONDS;
}
@Override
@@ -120,6 +128,9 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
* <p>
*/
public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
+ if (heartbeatTimeoutMultiplier <= 0) {
+ throw new IllegalArgumentException("HeartbeatTimeoutMultiplier
must be greater than 0");
+ }
this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
}
@@ -127,6 +138,64 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
return heartbeatTimeoutMultiplier;
}
+ /**
+ * Sets how many times a cluster data task will run, counting both the
first execution and subsequent retries in
+ * case of failure or timeout. The default is 5 attempts.
+ * <p>
+ * This can be useful when the cluster data root is on network based file
storage, where I/O operations may
+ * occasionally block for long or unpredictable periods.
+ */
+ public void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) {
+ if (clusterDataTaskMaxAttempts <= 0) {
+ throw new IllegalArgumentException("clusterDataTaskMaxRetries must
be greater than 0");
+ }
+ this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts;
+ }
+
+ public int getClusterDataTaskMaxAttempts() {
+ return clusterDataTaskMaxAttempts;
+ }
+
+ /**
+ * Sets the timeout for a cluster data task (reading or writing cluster
data). The default is 10 seconds.
+ * <p>
+ * Timeouts are useful when the cluster data root is on network storage,
where I/O operations may occasionally block
+ * for long or unpredictable periods.
+ */
+ public void setClusterDataTaskTimeout(long clusterDataTaskTimeout) {
+ if (clusterDataTaskTimeout <= 0) {
+ throw new IllegalArgumentException("clusterDataTaskMaxRetries must
be greater than 0");
+ }
+ this.clusterDataTaskTimeout = clusterDataTaskTimeout;
+ }
+
+ public long getClusterDataTaskTimeout() {
+ return clusterDataTaskTimeout;
+ }
+
+ /**
+ * The time unit for the clusterDataTaskTimeoutUnit, default to
TimeUnit.SECONDS.
+ */
+ public void setClusterDataTaskTimeoutUnit(TimeUnit
clusterDataTaskTimeoutUnit) {
+ this.clusterDataTaskTimeoutUnit = clusterDataTaskTimeoutUnit;
+ }
+
+ public TimeUnit getClusterDataTaskTimeoutUnit() {
+ return clusterDataTaskTimeoutUnit;
+ }
+
+ /**
+ * Sets the timeout for a cluster data task (reading or writing cluster
data). The default is 10 seconds.
+ * <p>
+ * Timeouts are useful when the cluster data root is on network storage,
where I/O operations may occasionally block
+ * for long or unpredictable periods.
+ * <p>
+ */
+ public void setClusterDataTaskTimeout(long clusterDataTaskTimeout,
TimeUnit clusterDataTaskTimeoutUnit) {
+ setClusterDataTaskTimeout(clusterDataTaskTimeout);
+ setClusterDataTaskTimeoutUnit(clusterDataTaskTimeoutUnit);
+ }
+
@Override
protected void doStop() throws Exception {
super.doStop();
@@ -142,6 +211,14 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
executor = null;
}
+
+ if (clusterDataTaskExecutor != null) {
+ if (context != null) {
+
context.getExecutorServiceManager().shutdown(clusterDataTaskExecutor);
+ } else {
+ clusterDataTaskExecutor.shutdown();
+ }
+ }
}
ScheduledExecutorService getExecutor() {
@@ -161,4 +238,19 @@ public class FileLockClusterService extends
AbstractCamelClusterService<FileLock
internalLock.unlock();
}
}
+
+ ExecutorService getClusterDataTaskExecutor() {
+ Lock internalLock = getInternalLock();
+ internalLock.lock();
+ try {
+ if (clusterDataTaskExecutor == null) {
+ final CamelContext context =
ObjectHelper.notNull(getCamelContext(), "CamelContext");
+ clusterDataTaskExecutor =
context.getExecutorServiceManager().newFixedThreadPool(this,
+ "FileLockClusterDataTask-" + getId(), 5);
+ }
+ return clusterDataTaskExecutor;
+ } finally {
+ internalLock.unlock();
+ }
+ }
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
new file mode 100644
index 000000000000..cc7932989793
--- /dev/null
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes cluster data read / write tasks asynchronously, with timeouts to
guard against potential unpredictable
+ * blocking I/O periods.
+ */
+class FileLockClusterTaskExecutor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileLockClusterTaskExecutor.class);
+ private final FileLockClusterService service;
+
+ FileLockClusterTaskExecutor(FileLockClusterService service) {
+ Objects.requireNonNull(service, "FileLockClusterService cannot be
null");
+ this.service = service;
+ }
+
+ /**
+ * If the cluster data root is network based, like an NFS mount, avoid
potential long blocking I/O to fail fast and
+ * reliably reason about the cluster state.
+ *
+ * @param task Supplier representing a task to run
+ */
+ <T> T run(Supplier<T> task) throws ExecutionException, TimeoutException {
+ Objects.requireNonNull(task, "Task cannot be null");
+
+ int maxAttempts = service.getClusterDataTaskMaxAttempts();
+ for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+ LOGGER.debug("Running cluster task attempt {} of {}", attempt,
maxAttempts);
+
+ CompletableFuture<T> future = CompletableFuture.supplyAsync(task,
service.getClusterDataTaskExecutor());
+ try {
+ return future.get(service.getClusterDataTaskTimeout(),
service.getClusterDataTaskTimeoutUnit());
+ } catch (InterruptedException e) {
+ LOGGER.trace("Cluster task interrupted on attempt {} of {}",
attempt, maxAttempts);
+ future.cancel(true);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (ExecutionException | TimeoutException e) {
+ LOGGER.debug("Cluster task encountered an exception on attempt
{} of {}", attempt, maxAttempts, e);
+ future.cancel(true);
+ if (attempt == maxAttempts) {
+ LOGGER.debug("Cluster task retry limit ({}) reached",
maxAttempts, e);
+ throw e;
+ }
+ } finally {
+ LOGGER.debug("Cluster task attempt {} ended", attempt);
+ }
+ }
+ return null;
+ }
+}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
index fd3a7982cf9f..5a7466ceb622 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java
@@ -62,7 +62,7 @@ final class FileLockClusterUtils {
FileChannel channel,
FileLockClusterLeaderInfo clusterLeaderInfo,
boolean forceMetaData)
- throws IOException {
+ throws Exception {
Objects.requireNonNull(channel, "channel cannot be null");
Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be
null");
@@ -100,7 +100,7 @@ final class FileLockClusterUtils {
* inconsistent state
* @throws IOException If reading the lock file failed
*/
- static FileLockClusterLeaderInfo readClusterLeaderInfo(Path
leaderDataPath) throws IOException {
+ static FileLockClusterLeaderInfo readClusterLeaderInfo(Path
leaderDataPath) throws Exception {
try {
byte[] bytes = Files.readAllBytes(leaderDataPath);
@@ -119,7 +119,7 @@ final class FileLockClusterUtils {
long lastHeartbeat = buf.getLong();
return new FileLockClusterLeaderInfo(uuidStr, intervalMillis,
lastHeartbeat);
- } catch (NoSuchFileException e) {
+ } catch (FileNotFoundException | NoSuchFileException e) {
// Handle NoSuchFileException to give the ClusterView a chance to
recreate the leadership data
return null;
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index 8156f00cfaff..80896e1f1c32 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -29,14 +29,18 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.apache.camel.util.function.ThrowingHelper;
+import org.apache.camel.util.function.ThrowingSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +61,7 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
private ScheduledFuture<?> task;
private int heartbeatTimeoutMultiplier;
private long acquireLockIntervalMilliseconds;
+ private FileLockClusterTaskExecutor clusterTaskExecutor;
FileLockClusterView(FileLockClusterService cluster, String namespace) {
super(cluster, namespace);
@@ -88,6 +93,8 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
@Override
protected void doStart() throws Exception {
+ FileLockClusterService service =
getClusterService().unwrap(FileLockClusterService.class);
+
// Start critical section
try {
contextStartLock.lock();
@@ -97,31 +104,42 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
fireLeadershipChangedEvent((CamelClusterMember) null);
}
- if (!Files.exists(leaderLockPath.getParent())) {
- Files.createDirectories(leaderLockPath.getParent());
+ // Attempt to pre-create cluster data files & directories. On
failure, it will either be attempted by another cluster member or run again
within the tryLock task loop
+ try {
+ if (!Files.exists(leaderLockPath.getParent())) {
+ Files.createDirectories(leaderLockPath.getParent());
+ }
+ } catch (IOException e) {
+ LOGGER.debug("Error creating directory {}",
leaderLockPath.getParent(), e);
}
- if (!Files.exists(leaderLockPath)) {
- Files.createFile(leaderLockPath);
+ try {
+ if (!Files.exists(leaderLockPath)) {
+ Files.createFile(leaderLockPath);
+ }
+ } catch (IOException e) {
+ LOGGER.debug("Error creating cluster leader lock file {}",
leaderLockPath, e);
}
- if (!Files.exists(leaderDataPath)) {
- Files.createFile(leaderDataPath);
+ try {
+ if (!Files.exists(leaderDataPath)) {
+ Files.createFile(leaderDataPath);
+ }
+ } catch (IOException e) {
+ LOGGER.debug("Error creating cluster leader data file {}",
leaderDataPath, e);
}
} finally {
// End critical section
contextStartLock.unlock();
}
- FileLockClusterService service =
getClusterService().unwrap(FileLockClusterService.class);
+ clusterTaskExecutor = new FileLockClusterTaskExecutor(service);
+
acquireLockIntervalMilliseconds = TimeUnit.MILLISECONDS.convert(
service.getAcquireLockInterval(),
service.getAcquireLockIntervalUnit());
heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier();
- if (heartbeatTimeoutMultiplier <= 0) {
- throw new IllegalArgumentException("HeartbeatTimeoutMultiplier
must be greater than 0");
- }
ScheduledExecutorService executor = service.getExecutor();
task = executor.scheduleWithFixedDelay(this::tryLock,
@@ -135,14 +153,20 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
@Override
protected void doStop() throws Exception {
if (localMember.isLeader() && leaderDataFile != null) {
- try {
- FileChannel channel = leaderDataFile.getChannel();
- channel.truncate(0);
- channel.force(true);
- } catch (Exception e) {
- // Log and ignore since we need to release the file lock and
do cleanup
- LOGGER.debug("Failed to truncate {} on {} stop",
leaderDataPath, getClass().getSimpleName(), e);
- }
+ clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<Void, Throwable>() {
+ @Override
+ public Void get() throws Throwable {
+ try {
+ FileChannel channel = leaderDataFile.getChannel();
+ channel.truncate(0);
+ channel.force(true);
+ } catch (Exception e) {
+ // Log and ignore since we need to release the file
lock and do cleanup
+ LOGGER.debug("Failed to truncate {} on {} stop",
leaderDataPath, getClass().getSimpleName(), e);
+ }
+ return null;
+ }
+ }));
}
closeInternal();
@@ -201,7 +225,7 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
// Update the cluster data file with the leader state
so that other cluster members can interrogate it
writeClusterLeaderInfo(false);
return;
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.debug("Failed writing cluster leader data to
{}", leaderDataPath, e);
}
}
@@ -212,6 +236,7 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
localMember.getUuid());
localMember.setStatus(ClusterMemberStatus.FOLLOWER);
fireLeadershipChangedEvent((CamelClusterMember) null);
+ clusterLeaderInfoRef.set(null);
releaseFileLock();
closeLockFiles();
lock = null;
@@ -223,9 +248,8 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
// Get & update cluster leader state
LOGGER.debug("Reading cluster leader state from {}",
leaderDataPath);
- FileLockClusterLeaderInfo latestClusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+ FileLockClusterLeaderInfo latestClusterLeaderInfo =
readClusterLeaderInfo();
FileLockClusterLeaderInfo previousClusterLeaderInfo =
clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo);
-
// Check if we can attempt to take cluster leadership
if (isLeaderStale(latestClusterLeaderInfo,
previousClusterLeaderInfo)
|| canReclaimLeadership(latestClusterLeaderInfo)) {
@@ -235,19 +259,17 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
}
// Try to recreate the cluster data directory in case it
got removed
- if (!Files.exists(leaderLockPath.getParent())) {
- Files.createDirectories(leaderLockPath.getParent());
- }
+ createClusterRootDirectoryIfRequired();
// Attempt to obtain cluster leadership
LOGGER.debug("Try to acquire a lock on {}
(cluster-member-id={})", leaderLockPath, localMember.getUuid());
- leaderLockFile = new
RandomAccessFile(leaderLockPath.toFile(), "rw");
- leaderDataFile = new
RandomAccessFile(leaderDataPath.toFile(), "rw");
+ leaderLockFile = createRandomAccessFile(leaderLockPath);
+ leaderDataFile = createRandomAccessFile(leaderDataPath);
lock = null;
lock = leaderLockFile.getChannel().tryLock(0, Math.max(1,
leaderLockFile.getChannel().size()), false);
- if (lock != null) {
+ if (lockIsValid()) {
LOGGER.info("Lock on file {} acquired (lock={},
cluster-member-id={})", leaderLockPath, lock,
localMember.getUuid());
localMember.setStatus(ClusterMemberStatus.LEADER);
@@ -286,28 +308,67 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
return leaderInfo != null &&
localMember.getUuid().equals(leaderInfo.getId());
}
- void writeClusterLeaderInfo(boolean forceMetaData) throws IOException {
+ void createClusterRootDirectoryIfRequired() throws ExecutionException,
TimeoutException {
+ clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<Void, Throwable>() {
+ @Override
+ public Void get() throws Throwable {
+ if (!Files.exists(leaderLockPath.getParent())) {
+ Files.createDirectories(leaderLockPath.getParent());
+ }
+ return null;
+ }
+ }));
+ }
+
+ RandomAccessFile createRandomAccessFile(Path path) throws
ExecutionException, TimeoutException {
+ return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<RandomAccessFile, Throwable>() {
+ @Override
+ public RandomAccessFile get() throws Throwable {
+ return new RandomAccessFile(path.toFile(), "rw");
+ }
+ }));
+ }
+
+ FileLockClusterLeaderInfo readClusterLeaderInfo() throws Exception {
+ return clusterTaskExecutor
+ .run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<FileLockClusterLeaderInfo, Throwable>() {
+ @Override
+ public FileLockClusterLeaderInfo get() throws Throwable {
+ return
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+ }
+ }));
+ }
+
+ void writeClusterLeaderInfo(boolean forceMetaData) throws Exception {
FileLockClusterLeaderInfo latestClusterLeaderInfo = new
FileLockClusterLeaderInfo(
localMember.getUuid(),
acquireLockIntervalMilliseconds,
System.currentTimeMillis());
- FileLockClusterUtils.writeClusterLeaderInfo(
- leaderDataPath,
- leaderDataFile.getChannel(),
- latestClusterLeaderInfo,
- forceMetaData);
+ clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<Void, Throwable>() {
+ @Override
+ public Void get() throws Throwable {
+ FileLockClusterUtils.writeClusterLeaderInfo(
+ leaderDataPath,
+ leaderDataFile.getChannel(),
+ latestClusterLeaderInfo,
+ forceMetaData);
+ return null;
+ }
+ }));
}
boolean isLeaderInternal() {
if (localMember.isLeader()) {
try {
- FileLockClusterLeaderInfo leaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath);
+ FileLockClusterLeaderInfo leaderInfo = readClusterLeaderInfo();
+ boolean leaderStale = isLeaderStale(leaderInfo,
clusterLeaderInfoRef.getAndSet(leaderInfo));
+ LOGGER.debug("Leader read cluster data {}, isStale={}",
leaderInfo, leaderStale);
+
return leaderInfo != null
- && lock != null
- && lock.isValid()
+ && !leaderStale
&& localMember.getUuid().equals(leaderInfo.getId())
- && Files.exists(leaderLockPath);
+ && lockIsValid();
} catch (Exception e) {
LOGGER.debug("Failed to read {} (cluster-member-id={})",
leaderLockPath, localMember.getUuid(), e);
return false;
@@ -316,6 +377,18 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
return false;
}
+ boolean lockIsValid() throws ExecutionException, TimeoutException {
+ if (lock != null && lock.isValid()) {
+ return clusterTaskExecutor.run(ThrowingHelper.wrapAsSupplier(new
ThrowingSupplier<Boolean, Throwable>() {
+ @Override
+ public Boolean get() throws Throwable {
+ return Files.exists(leaderLockPath);
+ }
+ }));
+ }
+ return false;
+ }
+
private final class ClusterMember implements CamelClusterMember {
private final AtomicReference<ClusterMemberStatus> status = new
AtomicReference<>(ClusterMemberStatus.STOPPED);
private final String uuid = UUID.randomUUID().toString();
diff --git
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
new file mode 100644
index 000000000000..0b086b3ee1a3
--- /dev/null
+++
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterTaskExecutorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.cluster;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class FileLockClusterTaskExecutorTest {
+ @Test
+ void runTaskWithDefaultMaxAttemptsAndTimeout() throws ExecutionException,
TimeoutException {
+ FileLockClusterService service = new FileLockClusterService();
+ service.setCamelContext(new DefaultCamelContext());
+
+ FileLockClusterTaskExecutor executor = new
FileLockClusterTaskExecutor(service);
+
+ String message = "Hello World";
+ String result = executor.run(new Supplier<String>() {
+ @Override
+ public String get() {
+ return message;
+ }
+ });
+
+ Assertions.assertEquals(message, result);
+ }
+
+ @Test
+ void runTaskWithMaxAttemptsExceeded() {
+ int maxAttempts = 3;
+ int timeoutMs = 100;
+
+ FileLockClusterService service = new FileLockClusterService();
+ service.setCamelContext(new DefaultCamelContext());
+ service.setClusterDataTaskMaxAttempts(maxAttempts);
+ service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+
+ FileLockClusterTaskExecutor executor = new
FileLockClusterTaskExecutor(service);
+
+ AtomicInteger count = new AtomicInteger();
+ String message = "Hello World";
+
+ Assertions.assertThrows(TimeoutException.class, () -> {
+ executor.run(new Supplier<String>() {
+ @Override
+ public String get() {
+ count.incrementAndGet();
+ try {
+ Thread.sleep(timeoutMs + 50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return message;
+ }
+ });
+ });
+
+ Assertions.assertEquals(3, count.get());
+ }
+
+ @Test
+ void runTaskWithMaxAttemptsNotExceeded() throws ExecutionException,
TimeoutException {
+ int maxAttempts = 3;
+ int timeoutMs = 100;
+
+ FileLockClusterService service = new FileLockClusterService();
+ service.setCamelContext(new DefaultCamelContext());
+ service.setClusterDataTaskMaxAttempts(maxAttempts);
+ service.setClusterDataTaskTimeout(timeoutMs, TimeUnit.MILLISECONDS);
+
+ FileLockClusterTaskExecutor executor = new
FileLockClusterTaskExecutor(service);
+
+ AtomicInteger count = new AtomicInteger();
+ String message = "Hello World";
+
+ String result = executor.run(new Supplier<String>() {
+ @Override
+ public String get() {
+ if (count.incrementAndGet() < 3) {
+ try {
+ Thread.sleep(timeoutMs + 50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return message;
+ }
+ });
+
+ Assertions.assertEquals(3, count.get());
+ Assertions.assertEquals(message, result);
+ }
+}
diff --git
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
index 3e6f9db8a578..28fb79fc2923 100644
---
a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
+++
b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.file.cluster;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -171,7 +170,7 @@ class FileLockClusterUtilsTest {
}
@Test
- void writeClusterLeaderInfoData(@TempDir Path tempDir) throws IOException {
+ void writeClusterLeaderInfoData(@TempDir Path tempDir) throws Exception {
Path clusterData = tempDir.resolve("leader.dat");
try (RandomAccessFile raf = new RandomAccessFile(clusterData.toFile(),
"rw")) {
FileLockClusterLeaderInfo leaderInfo = new
FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 2L);
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
index c335681eca85..29c9a60536bf 100644
---
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -258,94 +258,6 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
}
}
- @Test
- void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir
Path clusterMovedLocation) throws Exception {
- ClusterConfig leaderConfig = new ClusterConfig();
- leaderConfig.setTimerRepeatCount(-1);
-
- CamelContext clusterLeader = createCamelContext(leaderConfig);
-
- ClusterConfig followerConfig = new ClusterConfig();
- followerConfig.setTimerRepeatCount(-1);
- followerConfig.setAcquireLockDelay(2);
-
- CamelContext clusterFollower = createCamelContext(followerConfig);
-
- try {
- MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
- mockEndpointLeader.expectedMessageCount(5);
-
- clusterLeader.start();
- clusterFollower.start();
-
- mockEndpointLeader.assertIsSatisfied();
-
- AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new
AtomicReference<>();
- Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
- assertTrue(Files.exists(lockFile));
- assertTrue(Files.exists(dataFile));
- assertTrue(getClusterMember(clusterLeader).isLeader());
-
- FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
- assertNotNull(clusterLeaderInfo);
- leaderInfo.set(clusterLeaderInfo);
-
- String leaderId = clusterLeaderInfo.getId();
- assertNotNull(leaderId);
- assertDoesNotThrow(() -> UUID.fromString(leaderId));
- });
-
- // Wait enough time for the follower to have run its lock
acquisition scheduled task
- Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
-
- // The follower should not have produced any messages
- MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
- assertTrue(mockEndpointFollower.getExchanges().isEmpty());
-
- mockEndpointLeader.reset();
- mockEndpointLeader.expectedMinimumMessageCount(1);
-
- // Simulate the file system becoming detached by moving the
cluster data directory
- Files.move(clusterDir, clusterMovedLocation,
StandardCopyOption.REPLACE_EXISTING);
-
- // Simulate reattaching the file system by moving the cluster
directory back to the original location
- try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
- stream.forEach(path -> {
- try {
- Path destination =
clusterDir.resolve(clusterMovedLocation.relativize(path));
- if (Files.isDirectory(path)) {
- Files.createDirectories(destination);
- } else {
- Files.copy(path, destination,
StandardCopyOption.REPLACE_EXISTING);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- FileLockClusterLeaderInfo updatedInfo
- = new FileLockClusterLeaderInfo(
- leaderInfo.get().getId(),
TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis());
- Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
- try (RandomAccessFile file = new RandomAccessFile(data.toFile(),
"rw")) {
- FileLockClusterUtils.writeClusterLeaderInfo(data,
file.getChannel(), updatedInfo,
- true);
- }
-
- // Since the lock file is not considered 'stale', the original
leader should resume leadership
- Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
- assertTrue(getClusterMember(clusterLeader).isLeader());
- mockEndpointLeader.assertIsSatisfied();
- });
-
- assertTrue(mockEndpointFollower.getExchanges().isEmpty());
- } finally {
- clusterLeader.stop();
- clusterFollower.stop();
- }
- }
-
@Test
void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path
clusterMovedLocation) throws Exception {
ClusterConfig leaderConfig = new ClusterConfig();
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
index 72b49bc1b486..5bb4da929715 100644
---
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java
@@ -26,7 +26,6 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
-import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -221,28 +220,68 @@ class FileLockClusterServiceBasicFailoverTest extends
FileLockClusterServiceTest
}
@Test
- void negativeHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+ void negativeHeartbeatTimeoutMultiplierThrowsException() {
ClusterConfig config = new ClusterConfig();
config.setHeartbeatTimeoutMultiplier(-1);
-
- Exception exception = assertThrows(Exception.class, () -> {
+ assertThrows(IllegalArgumentException.class, () -> {
try (CamelContext camelContext = createCamelContext(config)) {
camelContext.start();
}
});
- assertIsInstanceOf(IllegalArgumentException.class,
exception.getCause());
}
@Test
- void zeroHeartbeatTimeoutMultiplierThrowsException() throws Exception {
+ void zeroHeartbeatTimeoutMultiplierThrowsException() {
ClusterConfig config = new ClusterConfig();
config.setHeartbeatTimeoutMultiplier(0);
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ }
+
+ @Test
+ void negativeClusterDataTaskMaxAttemptsThrowsException() {
+ ClusterConfig config = new ClusterConfig();
+ config.setClusterDataTaskMaxAttempts(-1);
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ }
+
+ @Test
+ void zeroClusterDataTaskMaxAttemptsThrowsException() {
+ ClusterConfig config = new ClusterConfig();
+ config.setClusterDataTaskMaxAttempts(0);
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ }
+
+ @Test
+ void negativeClusterDataTaskTimeoutThrowsException() {
+ ClusterConfig config = new ClusterConfig();
+ config.setClusterDataTaskTimeout(-1);
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (CamelContext camelContext = createCamelContext(config)) {
+ camelContext.start();
+ }
+ });
+ }
- Exception exception = assertThrows(Exception.class, () -> {
+ @Test
+ void zeroClusterDataTaskTimeoutThrowsException() {
+ ClusterConfig config = new ClusterConfig();
+ config.setClusterDataTaskTimeout(0);
+ assertThrows(IllegalArgumentException.class, () -> {
try (CamelContext camelContext = createCamelContext(config)) {
camelContext.start();
}
});
- assertIsInstanceOf(IllegalArgumentException.class,
exception.getCause());
}
}
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
index 72290aaacbbb..b7f7bfbbde55 100644
---
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java
@@ -67,6 +67,8 @@ abstract class FileLockClusterServiceTestBase {
service.setAcquireLockInterval(1);
service.setRoot(clusterDir.toString());
service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier());
+
service.setClusterDataTaskMaxAttempts(config.getClusterDataTaskMaxAttempts());
+ service.setClusterDataTaskTimeout(config.getClusterDataTaskTimeout());
return service;
}
@@ -83,6 +85,8 @@ abstract class FileLockClusterServiceTestBase {
private long acquireLockDelay = 1;
private long timerRepeatCount = 5;
private int heartbeatTimeoutMultiplier = 5;
+ private int clusterDataTaskMaxAttempts = 5;
+ private long clusterDataTaskTimeout = 5;
long getAcquireLockDelay() {
return acquireLockDelay;
@@ -104,12 +108,28 @@ abstract class FileLockClusterServiceTestBase {
return TimeUnit.SECONDS.toMillis(getAcquireLockDelay()) + 500;
}
- public int getHeartbeatTimeoutMultiplier() {
+ int getHeartbeatTimeoutMultiplier() {
return heartbeatTimeoutMultiplier;
}
- public void setHeartbeatTimeoutMultiplier(int
heartbeatTimeoutMultiplier) {
+ void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
}
+
+ int getClusterDataTaskMaxAttempts() {
+ return this.clusterDataTaskMaxAttempts;
+ }
+
+ void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) {
+ this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts;
+ }
+
+ long getClusterDataTaskTimeout() {
+ return clusterDataTaskTimeout;
+ }
+
+ void setClusterDataTaskTimeout(long clusterDataTaskTimeout) {
+ this.clusterDataTaskTimeout = clusterDataTaskTimeout;
+ }
}
}
diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc
b/docs/user-manual/modules/ROOT/pages/clustering.adoc
index 669a56d13216..322443e064c1 100644
--- a/docs/user-manual/modules/ROOT/pages/clustering.adoc
+++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc
@@ -65,6 +65,9 @@ Configuration options:
| acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS |
TimeUnit
| acquireLockInterval | The time to wait between attempts to try to acquire
the cluster lock | 10 | long
| acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS |
TimeUnit
+| clusterDataTaskMaxAttempts | Sets how many times a cluster data task will
run, counting both the first execution and subsequent retries in case of
failure or timeout. This can be useful when the cluster data root is on network
based file storage, where I/O operations may occasionally block for long or
unpredictable periods | 5 | int
+| clusterDataTaskTimeout | Sets the timeout for a cluster data task (reading
or writing cluster data). Timeouts are useful when the cluster data root is on
network storage, where I/O operations may occasionally block for long or
unpredictable periods | 10 | long
+| clusterDataTaskTimeoutUnit | The time unit for the
clusterDataTaskTimeoutUnit | SECONDS | TimeUnit
| heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader
acquireLockInterval to determine how long followers should wait before
considering the leader "stale". For example, if the leader updates its
heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers
will tolerate up to code 2s * 3 = 6s of silence before declaring the leader
unavailable | 5 | int
| rootPath | The file cluster root directory path | | String
|===