This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-4.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 33f20fa0bd020b758b1a98f2e5ba6616d6f138d7 Author: James Netherton <[email protected]> AuthorDate: Tue Dec 16 07:00:42 2025 +0000 CAMEL-22784: Recreate file lock cluster directory if deletion detected --- .../file/cluster/FileLockClusterView.java | 5 + ...FileLockClusterServiceAdvancedFailoverTest.java | 127 +++++++++++++++++++-- 2 files changed, 125 insertions(+), 7 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index 64701a750b85..5ce7c554ebc7 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -246,6 +246,11 @@ public class FileLockClusterView extends AbstractCamelClusterView { return; } + // Try to recreate the cluster data directory in case it got removed + if (!Files.exists(leaderLockPath.getParent())) { + Files.createDirectories(leaderLockPath.getParent()); + } + // Attempt to obtain cluster leadership LOGGER.debug("Try to acquire a lock on {} (cluster-member-id={})", leaderLockPath, localMember.getUuid()); leaderLockFile = new RandomAccessFile(leaderLockPath.toFile(), "rw"); diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java index 515cee417a84..c335681eca85 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.cluster; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; @@ -24,9 +25,11 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.camel.CamelContext; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.FileUtil; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -178,7 +181,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT } @Test - void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception { + void multipleClusterMembersReelectLeaderIfClusterDataDirectoryDeleted() throws Exception { ClusterConfig leaderConfig = new ClusterConfig(); leaderConfig.setTimerRepeatCount(-1); @@ -192,7 +195,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT try { MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); - mockEndpointLeader.expectedMessageCount(5); + mockEndpointLeader.expectedMinimumMessageCount(1); clusterLeader.start(); clusterFollower.start(); @@ -222,16 +225,113 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT mockEndpointLeader.reset(); mockEndpointLeader.expectedMinimumMessageCount(1); - // Simulate the file system becoming detached by moving the cluster data directory - Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING); + // Delete the cluster data directory + FileUtil.removeDir(clusterDir.toFile()); // Wait for leadership to be relinquished Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { assertFalse(getClusterMember(clusterLeader).isLeader()); }); + // Wait for leadership to be gained by one of the members + CamelContext oldLeader = clusterLeader; + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + boolean newLeaderElected = false; + + // Original cluster leader regained leadership + if (getClusterMember(oldLeader).isLeader()) { + newLeaderElected = true; + mockEndpointLeader.assertIsSatisfied(); + } + + // A different cluster member gained leadership + if (getClusterMember(clusterFollower).isLeader()) { + newLeaderElected = true; + mockEndpointFollower.assertIsSatisfied(); + } + + assertTrue(newLeaderElected); + }); + } finally { + clusterLeader.stop(); + clusterFollower.stop(); + } + } + + @Test + void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception { + ClusterConfig leaderConfig = new ClusterConfig(); + leaderConfig.setTimerRepeatCount(-1); + + CamelContext clusterLeader = createCamelContext(leaderConfig); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setTimerRepeatCount(-1); + followerConfig.setAcquireLockDelay(2); + + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointLeader.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointLeader.assertIsSatisfied(); + + AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + leaderInfo.set(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + mockEndpointLeader.reset(); + mockEndpointLeader.expectedMinimumMessageCount(1); + + // Simulate the file system becoming detached by moving the cluster data directory + Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING); + // Simulate reattaching the file system by moving the cluster directory back to the original location - Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING); + try (Stream<Path> stream = Files.walk(clusterMovedLocation)) { + stream.forEach(path -> { + try { + Path destination = clusterDir.resolve(clusterMovedLocation.relativize(path)); + if (Files.isDirectory(path)) { + Files.createDirectories(destination); + } else { + Files.copy(path, destination, StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + FileLockClusterLeaderInfo updatedInfo + = new FileLockClusterLeaderInfo( + leaderInfo.get().getId(), TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis()); + Path data = clusterMovedLocation.resolve(NAMESPACE + ".data"); + try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) { + FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo, + true); + } // Since the lock file is not considered 'stale', the original leader should resume leadership Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { @@ -309,7 +409,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT FileLockClusterLeaderInfo updatedInfo = new FileLockClusterLeaderInfo( - leaderInfo.get().getId(), TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp); + leaderInfo.get().getId(), TimeUnit.MILLISECONDS.toMillis(2), staleHeartbeatTimestamp); Path data = clusterMovedLocation.resolve(NAMESPACE + ".data"); try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) { FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo, @@ -317,7 +417,20 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT } // Simulate reattaching the file system by moving the cluster directory back to the original location - Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING); + try (Stream<Path> stream = Files.walk(clusterMovedLocation)) { + stream.forEach(path -> { + try { + Path destination = clusterDir.resolve(clusterMovedLocation.relativize(path)); + if (Files.isDirectory(path)) { + Files.createDirectories(destination); + } else { + Files.copy(path, destination, StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } mockEndpointFollower.expectedMinimumMessageCount(1);
