This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ee463d32f Replacing CompletableFuture.supplyAsync() with fixed thread 
pool executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, 
which may not have enough threads. (#7908)
0ee463d32f is described below

commit 0ee463d32f58961da2563660012e0f232caba9a0
Author: Ventsislav Marinov <[email protected]>
AuthorDate: Fri Aug 15 09:00:54 2025 -0400

    Replacing CompletableFuture.supplyAsync() with fixed thread pool executor. 
CompletableFuture.supplyAsync() uses the common ForkJoinPool, which may not 
have enough threads. (#7908)
    
    * Those tests have a race condition: it assumes all tasks start before 
await() times out — which is not guaranteed.
    
    * Replacing CompletableFuture.supplyAsync() with fixed thread pool 
executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which 
may not have enough threads.
    
    * Replacing CompletableFuture.supplyAsync() with fixed thread pool 
executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which 
may not have enough threads.
    
    ---------
    
    Co-authored-by: VENTSISLAV MARINOV <[email protected]>
---
 .../internal/WanCopyRegionFunctionServiceTest.java | 74 ++++++++++++----------
 1 file changed, 40 insertions(+), 34 deletions(-)

diff --git 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
index 1b5a89821b..f9649fd15f 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -115,49 +117,52 @@ public class WanCopyRegionFunctionServiceTest {
 
   @Test
   public void 
cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() {
-    CountDownLatch latch = new CountDownLatch(2);
+    int executions = 2;
+    CountDownLatch latch = new CountDownLatch(executions);
+    ExecutorService executorService = Executors.newFixedThreadPool(executions);
     Callable<CliFunctionResult> firstExecution = () -> {
       latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
       return null;
     };
 
-    CompletableFuture
-        .supplyAsync(() -> {
-          try {
-            return service.execute(firstExecution, "myRegion", "mySender1");
-          } catch (Exception e) {
-            return null;
-          }
-        });
+    executorService.submit(() -> {
+      try {
+        return service.execute(firstExecution, "myRegion", "mySender1");
+      } catch (Exception e) {
+        return null;
+      }
+    });
 
     Callable<CliFunctionResult> secondExecution = () -> {
       latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
       return null;
     };
 
-    CompletableFuture
-        .supplyAsync(() -> {
-          try {
-            return service.execute(secondExecution, "myRegion", "mySender");
-          } catch (Exception e) {
-            return null;
-          }
-        });
+    executorService.submit(() -> {
+      try {
+        return service.execute(secondExecution, "myRegion", "mySender");
+      } catch (Exception e) {
+        return null;
+      }
+    });
 
     // Wait for the functions to start execution
-    await().untilAsserted(() -> 
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2));
+    await().untilAsserted(
+        () -> 
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
 
     // Cancel the function execution
     String executionsString = service.cancelAll();
 
     assertThat(executionsString).isEqualTo("[(myRegion,mySender1), 
(myRegion,mySender)]");
     await().untilAsserted(() -> 
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+    executorService.shutdown();
   }
 
   @Test
   public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
     int executions = 5;
     CountDownLatch latch = new CountDownLatch(executions);
+    ExecutorService executorService = Executors.newFixedThreadPool(executions);
     for (int i = 0; i < executions; i++) {
       Callable<CliFunctionResult> execution = () -> {
         latch.await(GeodeAwaitility.getTimeout().getSeconds(), 
TimeUnit.SECONDS);
@@ -165,14 +170,13 @@ public class WanCopyRegionFunctionServiceTest {
       };
 
       final String regionName = String.valueOf(i);
-      CompletableFuture
-          .supplyAsync(() -> {
-            try {
-              return service.execute(execution, regionName, "mySender1");
-            } catch (Exception e) {
-              return null;
-            }
-          });
+      executorService.submit(() -> {
+        try {
+          return service.execute(execution, regionName, "mySender1");
+        } catch (Exception e) {
+          return null;
+        }
+      });
     }
 
     // Wait for the functions to start execution
@@ -183,6 +187,7 @@ public class WanCopyRegionFunctionServiceTest {
     for (int i = 0; i < executions; i++) {
       latch.countDown();
     }
+    executorService.shutdown();
   }
 
   @Test
@@ -193,6 +198,7 @@ public class WanCopyRegionFunctionServiceTest {
     int executions = 4;
     CountDownLatch latch = new CountDownLatch(executions);
     AtomicInteger concurrentExecutions = new AtomicInteger(0);
+    ExecutorService executorService = Executors.newFixedThreadPool(executions);
     for (int i = 0; i < executions; i++) {
       Callable<CliFunctionResult> execution = () -> {
         concurrentExecutions.incrementAndGet();
@@ -202,14 +208,13 @@ public class WanCopyRegionFunctionServiceTest {
       };
 
       final String regionName = String.valueOf(i);
-      CompletableFuture
-          .supplyAsync(() -> {
-            try {
-              return service.execute(execution, regionName, "mySender1");
-            } catch (Exception e) {
-              return null;
-            }
-          });
+      executorService.submit(() -> {
+        try {
+          return service.execute(execution, regionName, "mySender1");
+        } catch (Exception e) {
+          return null;
+        }
+      });
     }
 
     // Wait for the functions to start execution
@@ -225,6 +230,7 @@ public class WanCopyRegionFunctionServiceTest {
     }
 
     await().untilAsserted(() -> 
assertThat(concurrentExecutions.get()).isEqualTo(0));
+    executorService.shutdown();
   }
 
 }

Reply via email to