This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c4ddb30bc Replaces AtomicBoolean with BooleanSupplier in in fate 
(#6143)
8c4ddb30bc is described below

commit 8c4ddb30bc373920bf491018db8c349edfa0bfa2
Author: Keith Turner <[email protected]>
AuthorDate: Mon Feb 23 11:24:33 2026 -0800

    Replaces AtomicBoolean with BooleanSupplier in in fate (#6143)
---
 .../java/org/apache/accumulo/core/fate/AbstractFateStore.java     | 8 ++++----
 .../src/main/java/org/apache/accumulo/core/fate/FateExecutor.java | 2 +-
 core/src/main/java/org/apache/accumulo/core/fate/FateStore.java   | 5 +++--
 .../java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java     | 4 ++--
 .../main/java/org/apache/accumulo/core/logging/FateLogger.java    | 4 ++--
 core/src/test/java/org/apache/accumulo/core/fate/TestStore.java   | 4 ++--
 .../main/java/org/apache/accumulo/test/fate/FateStoreITBase.java  | 4 ++--
 7 files changed, 16 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 751e3d42a4..14755be12b 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -161,11 +162,11 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
       EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);
 
   @Override
-  public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
+  public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
 
     AtomicLong seen = new AtomicLong(0);
 
-    while (keepWaiting.get() && seen.get() == 0) {
+    while (keepWaiting.getAsBoolean() && seen.get() == 0) {
       final long beforeCount = unreservedRunnableCount.getCount();
       final boolean beforeDeferredOverflow = deferredOverflow.get();
 
@@ -207,8 +208,7 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
           }
 
           if (waitTime > 0) {
-            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
-                keepWaiting::get);
+            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime, keepWaiting);
           }
         }
       }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index bbf0bcb81e..883146eb4e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -308,7 +308,7 @@ public class FateExecutor<T> {
     public void run() {
       while (fate.getKeepRunning().get() && !isShutdown()) {
         try {
-          fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> {
+          fate.getStore().runnable(() -> fate.getKeepRunning().get(), 
fateIdStatus -> {
             // The FateId with the fate operation 'fateOp' is workable by this 
FateExecutor if
             // 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' 
is in 'fateOps')
             // 2) The transaction was cancelled while NEW. This is an edge 
case that needs to be
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index e28e793642..6756f84f5a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -29,6 +29,8 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -151,8 +153,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T>, 
AutoCloseable {
      * longer interact with it.
      *
      * @param deferTime time to keep this transaction from being returned by
-     *        {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, 
java.util.function.Consumer)}.
-     *        Must be non-negative.
+     *        {@link #runnable(BooleanSupplier, Consumer)}. Must be 
non-negative.
      */
     void unreserve(Duration deferTime);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index 263a9b090b..451823ac70 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -23,7 +23,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 
@@ -163,7 +163,7 @@ public interface ReadOnlyFateStore<T> {
    * is found or until the keepWaiting parameter is false. It will return once 
all runnable ids
    * found were passed to the consumer.
    */
-  void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer);
+  void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> 
idConsumer);
 
   /**
    * Returns true if the deferred map was cleared and if deferred executions 
are currently disabled
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 331401fc6b..8c7a956c05 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -140,7 +140,7 @@ public class FateLogger {
       }
 
       @Override
-      public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
+      public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
         store.runnable(keepWaiting, idConsumer);
       }
 
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index e4d057fc10..86a4106c61 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -30,7 +30,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 
@@ -273,7 +273,7 @@ public class TestStore implements FateStore<String> {
   }
 
   @Override
-  public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
+  public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> 
idConsumer) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
index 2198b73767..9fd3ba769d 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
@@ -199,7 +199,7 @@ public abstract class FateStoreITBase extends 
SharedMiniClusterBase
     try {
       // Run and verify all 10 transactions still exist and were not
       // run because of the deferral time of all the transactions
-      future = executor.submit(() -> store.runnable(keepRunning,
+      future = executor.submit(() -> store.runnable(keepRunning::get,
           fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
       Thread.sleep(2000);
       assertEquals(10, transactions.size());
@@ -225,7 +225,7 @@ public abstract class FateStoreITBase extends 
SharedMiniClusterBase
       // Run and verify all 11 transactions were processed
       // and removed from the store
       keepRunning.set(true);
-      future = executor.submit(() -> store.runnable(keepRunning,
+      future = executor.submit(() -> store.runnable(keepRunning::get,
           fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
       Wait.waitFor(transactions::isEmpty);
       // Setting this flag to false should terminate the task if sleeping

Reply via email to