This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ee463d32f Replacing CompletableFuture.supplyAsync() with fixed thread
pool executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool,
which may not have enough threads. (#7908)
0ee463d32f is described below
commit 0ee463d32f58961da2563660012e0f232caba9a0
Author: Ventsislav Marinov <[email protected]>
AuthorDate: Fri Aug 15 09:00:54 2025 -0400
Replacing CompletableFuture.supplyAsync() with fixed thread pool executor.
CompletableFuture.supplyAsync() uses the common ForkJoinPool, which may not
have enough threads. (#7908)
* Those tests have a race condition: it assumes all tasks start before
await() times out — which is not guaranteed.
* Replacing CompletableFuture.supplyAsync() with fixed thread pool
executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which
may not have enough threads.
* Replacing CompletableFuture.supplyAsync() with fixed thread pool
executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which
may not have enough threads.
---------
Co-authored-by: VENTSISLAV MARINOV <[email protected]>
---
.../internal/WanCopyRegionFunctionServiceTest.java | 74 ++++++++++++----------
1 file changed, 40 insertions(+), 34 deletions(-)
diff --git
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
index 1b5a89821b..f9649fd15f 100644
---
a/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
+++
b/geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java
@@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -115,49 +117,52 @@ public class WanCopyRegionFunctionServiceTest {
@Test
public void
cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() {
- CountDownLatch latch = new CountDownLatch(2);
+ int executions = 2;
+ CountDownLatch latch = new CountDownLatch(executions);
+ ExecutorService executorService = Executors.newFixedThreadPool(executions);
Callable<CliFunctionResult> firstExecution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
return null;
};
- CompletableFuture
- .supplyAsync(() -> {
- try {
- return service.execute(firstExecution, "myRegion", "mySender1");
- } catch (Exception e) {
- return null;
- }
- });
+ executorService.submit(() -> {
+ try {
+ return service.execute(firstExecution, "myRegion", "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
Callable<CliFunctionResult> secondExecution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
return null;
};
- CompletableFuture
- .supplyAsync(() -> {
- try {
- return service.execute(secondExecution, "myRegion", "mySender");
- } catch (Exception e) {
- return null;
- }
- });
+ executorService.submit(() -> {
+ try {
+ return service.execute(secondExecution, "myRegion", "mySender");
+ } catch (Exception e) {
+ return null;
+ }
+ });
// Wait for the functions to start execution
- await().untilAsserted(() ->
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2));
+ await().untilAsserted(
+ () ->
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
// Cancel the function execution
String executionsString = service.cancelAll();
assertThat(executionsString).isEqualTo("[(myRegion,mySender1),
(myRegion,mySender)]");
await().untilAsserted(() ->
assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
+ executorService.shutdown();
}
@Test
public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
int executions = 5;
CountDownLatch latch = new CountDownLatch(executions);
+ ExecutorService executorService = Executors.newFixedThreadPool(executions);
for (int i = 0; i < executions; i++) {
Callable<CliFunctionResult> execution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(),
TimeUnit.SECONDS);
@@ -165,14 +170,13 @@ public class WanCopyRegionFunctionServiceTest {
};
final String regionName = String.valueOf(i);
- CompletableFuture
- .supplyAsync(() -> {
- try {
- return service.execute(execution, regionName, "mySender1");
- } catch (Exception e) {
- return null;
- }
- });
+ executorService.submit(() -> {
+ try {
+ return service.execute(execution, regionName, "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
}
// Wait for the functions to start execution
@@ -183,6 +187,7 @@ public class WanCopyRegionFunctionServiceTest {
for (int i = 0; i < executions; i++) {
latch.countDown();
}
+ executorService.shutdown();
}
@Test
@@ -193,6 +198,7 @@ public class WanCopyRegionFunctionServiceTest {
int executions = 4;
CountDownLatch latch = new CountDownLatch(executions);
AtomicInteger concurrentExecutions = new AtomicInteger(0);
+ ExecutorService executorService = Executors.newFixedThreadPool(executions);
for (int i = 0; i < executions; i++) {
Callable<CliFunctionResult> execution = () -> {
concurrentExecutions.incrementAndGet();
@@ -202,14 +208,13 @@ public class WanCopyRegionFunctionServiceTest {
};
final String regionName = String.valueOf(i);
- CompletableFuture
- .supplyAsync(() -> {
- try {
- return service.execute(execution, regionName, "mySender1");
- } catch (Exception e) {
- return null;
- }
- });
+ executorService.submit(() -> {
+ try {
+ return service.execute(execution, regionName, "mySender1");
+ } catch (Exception e) {
+ return null;
+ }
+ });
}
// Wait for the functions to start execution
@@ -225,6 +230,7 @@ public class WanCopyRegionFunctionServiceTest {
}
await().untilAsserted(() ->
assertThat(concurrentExecutions.get()).isEqualTo(0));
+ executorService.shutdown();
}
}