This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 33f20fa0bd020b758b1a98f2e5ba6616d6f138d7
Author: James Netherton <[email protected]>
AuthorDate: Tue Dec 16 07:00:42 2025 +0000

    CAMEL-22784: Recreate file lock cluster directory if deletion detected
---
 .../file/cluster/FileLockClusterView.java          |   5 +
 ...FileLockClusterServiceAdvancedFailoverTest.java | 127 +++++++++++++++++++--
 2 files changed, 125 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index 64701a750b85..5ce7c554ebc7 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -246,6 +246,11 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                         return;
                     }
 
+                    // Try to recreate the cluster data directory in case it 
got removed
+                    if (!Files.exists(leaderLockPath.getParent())) {
+                        Files.createDirectories(leaderLockPath.getParent());
+                    }
+
                     // Attempt to obtain cluster leadership
                     LOGGER.debug("Try to acquire a lock on {} 
(cluster-member-id={})", leaderLockPath, localMember.getUuid());
                     leaderLockFile = new 
RandomAccessFile(leaderLockPath.toFile(), "rw");
diff --git 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
index 515cee417a84..c335681eca85 100644
--- 
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
+++ 
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.file.cluster;
 
+import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -24,9 +25,11 @@ import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.FileUtil;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -178,7 +181,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
     }
 
     @Test
-    void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir 
Path clusterMovedLocation) throws Exception {
+    void multipleClusterMembersReelectLeaderIfClusterDataDirectoryDeleted() 
throws Exception {
         ClusterConfig leaderConfig = new ClusterConfig();
         leaderConfig.setTimerRepeatCount(-1);
 
@@ -192,7 +195,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
 
         try {
             MockEndpoint mockEndpointLeader = 
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
-            mockEndpointLeader.expectedMessageCount(5);
+            mockEndpointLeader.expectedMinimumMessageCount(1);
 
             clusterLeader.start();
             clusterFollower.start();
@@ -222,16 +225,113 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
             mockEndpointLeader.reset();
             mockEndpointLeader.expectedMinimumMessageCount(1);
 
-            // Simulate the file system becoming detached by moving the 
cluster data directory
-            Files.move(clusterDir, clusterMovedLocation, 
StandardCopyOption.REPLACE_EXISTING);
+            // Delete the cluster data directory
+            FileUtil.removeDir(clusterDir.toFile());
 
             // Wait for leadership to be relinquished
             Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() 
-> {
                 assertFalse(getClusterMember(clusterLeader).isLeader());
             });
 
+            // Wait for leadership to be gained by one of the members
+            CamelContext oldLeader = clusterLeader;
+            Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() 
-> {
+                boolean newLeaderElected = false;
+
+                // Original cluster leader regained leadership
+                if (getClusterMember(oldLeader).isLeader()) {
+                    newLeaderElected = true;
+                    mockEndpointLeader.assertIsSatisfied();
+                }
+
+                // A different cluster member gained leadership
+                if (getClusterMember(clusterFollower).isLeader()) {
+                    newLeaderElected = true;
+                    mockEndpointFollower.assertIsSatisfied();
+                }
+
+                assertTrue(newLeaderElected);
+            });
+        } finally {
+            clusterLeader.stop();
+            clusterFollower.stop();
+        }
+    }
+
+    @Test
+    void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir 
Path clusterMovedLocation) throws Exception {
+        ClusterConfig leaderConfig = new ClusterConfig();
+        leaderConfig.setTimerRepeatCount(-1);
+
+        CamelContext clusterLeader = createCamelContext(leaderConfig);
+
+        ClusterConfig followerConfig = new ClusterConfig();
+        followerConfig.setTimerRepeatCount(-1);
+        followerConfig.setAcquireLockDelay(2);
+
+        CamelContext clusterFollower = createCamelContext(followerConfig);
+
+        try {
+            MockEndpoint mockEndpointLeader = 
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+            mockEndpointLeader.expectedMessageCount(5);
+
+            clusterLeader.start();
+            clusterFollower.start();
+
+            mockEndpointLeader.assertIsSatisfied();
+
+            AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new 
AtomicReference<>();
+            Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() 
-> {
+                assertTrue(Files.exists(lockFile));
+                assertTrue(Files.exists(dataFile));
+                assertTrue(getClusterMember(clusterLeader).isLeader());
+
+                FileLockClusterLeaderInfo clusterLeaderInfo = 
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+                assertNotNull(clusterLeaderInfo);
+                leaderInfo.set(clusterLeaderInfo);
+
+                String leaderId = clusterLeaderInfo.getId();
+                assertNotNull(leaderId);
+                assertDoesNotThrow(() -> UUID.fromString(leaderId));
+            });
+
+            // Wait enough time for the follower to have run its lock 
acquisition scheduled task
+            Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+            // The follower should not have produced any messages
+            MockEndpoint mockEndpointFollower = 
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+            assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+            mockEndpointLeader.reset();
+            mockEndpointLeader.expectedMinimumMessageCount(1);
+
+            // Simulate the file system becoming detached by moving the 
cluster data directory
+            Files.move(clusterDir, clusterMovedLocation, 
StandardCopyOption.REPLACE_EXISTING);
+
             // Simulate reattaching the file system by moving the cluster 
directory back to the original location
-            Files.move(clusterMovedLocation, clusterDir, 
StandardCopyOption.REPLACE_EXISTING);
+            try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
+                stream.forEach(path -> {
+                    try {
+                        Path destination = 
clusterDir.resolve(clusterMovedLocation.relativize(path));
+                        if (Files.isDirectory(path)) {
+                            Files.createDirectories(destination);
+                        } else {
+                            Files.copy(path, destination, 
StandardCopyOption.REPLACE_EXISTING);
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
+
+            FileLockClusterLeaderInfo updatedInfo
+                    = new FileLockClusterLeaderInfo(
+                            leaderInfo.get().getId(), 
TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis());
+            Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
+            try (RandomAccessFile file = new RandomAccessFile(data.toFile(), 
"rw")) {
+                FileLockClusterUtils.writeClusterLeaderInfo(data, 
file.getChannel(), updatedInfo,
+                        true);
+            }
 
             // Since the lock file is not considered 'stale', the original 
leader should resume leadership
             Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() 
-> {
@@ -309,7 +409,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
 
             FileLockClusterLeaderInfo updatedInfo
                     = new FileLockClusterLeaderInfo(
-                            leaderInfo.get().getId(), 
TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp);
+                            leaderInfo.get().getId(), 
TimeUnit.MILLISECONDS.toMillis(2), staleHeartbeatTimestamp);
             Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
             try (RandomAccessFile file = new RandomAccessFile(data.toFile(), 
"rw")) {
                 FileLockClusterUtils.writeClusterLeaderInfo(data, 
file.getChannel(), updatedInfo,
@@ -317,7 +417,20 @@ class FileLockClusterServiceAdvancedFailoverTest extends 
FileLockClusterServiceT
             }
 
             // Simulate reattaching the file system by moving the cluster 
directory back to the original location
-            Files.move(clusterMovedLocation, clusterDir, 
StandardCopyOption.REPLACE_EXISTING);
+            try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
+                stream.forEach(path -> {
+                    try {
+                        Path destination = 
clusterDir.resolve(clusterMovedLocation.relativize(path));
+                        if (Files.isDirectory(path)) {
+                            Files.createDirectories(destination);
+                        } else {
+                            Files.copy(path, destination, 
StandardCopyOption.REPLACE_EXISTING);
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+            }
 
             mockEndpointFollower.expectedMinimumMessageCount(1);
 

Reply via email to