This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 8c4ddb30bc Replaces AtomicBoolean with BooleanSupplier in in fate
(#6143)
8c4ddb30bc is described below
commit 8c4ddb30bc373920bf491018db8c349edfa0bfa2
Author: Keith Turner <[email protected]>
AuthorDate: Mon Feb 23 11:24:33 2026 -0800
Replaces AtomicBoolean with BooleanSupplier in in fate (#6143)
---
.../java/org/apache/accumulo/core/fate/AbstractFateStore.java | 8 ++++----
.../src/main/java/org/apache/accumulo/core/fate/FateExecutor.java | 2 +-
core/src/main/java/org/apache/accumulo/core/fate/FateStore.java | 5 +++--
.../java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java | 4 ++--
.../main/java/org/apache/accumulo/core/logging/FateLogger.java | 4 ++--
core/src/test/java/org/apache/accumulo/core/fate/TestStore.java | 4 ++--
.../main/java/org/apache/accumulo/test/fate/FateStoreITBase.java | 4 ++--
7 files changed, 16 insertions(+), 15 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 751e3d42a4..14755be12b 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -161,11 +162,11 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);
@Override
- public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus>
idConsumer) {
+ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus>
idConsumer) {
AtomicLong seen = new AtomicLong(0);
- while (keepWaiting.get() && seen.get() == 0) {
+ while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();
@@ -207,8 +208,7 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
}
if (waitTime > 0) {
- unreservedRunnableCount.waitFor(count -> count != beforeCount,
waitTime,
- keepWaiting::get);
+ unreservedRunnableCount.waitFor(count -> count != beforeCount,
waitTime, keepWaiting);
}
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index bbf0bcb81e..883146eb4e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -308,7 +308,7 @@ public class FateExecutor<T> {
public void run() {
while (fate.getKeepRunning().get() && !isShutdown()) {
try {
- fate.getStore().runnable(fate.getKeepRunning(), fateIdStatus -> {
+ fate.getStore().runnable(() -> fate.getKeepRunning().get(),
fateIdStatus -> {
// The FateId with the fate operation 'fateOp' is workable by this
FateExecutor if
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp'
is in 'fateOps')
// 2) The transaction was cancelled while NEW. This is an edge
case that needs to be
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index e28e793642..6756f84f5a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -29,6 +29,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.hadoop.io.DataInputBuffer;
@@ -151,8 +153,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T>,
AutoCloseable {
* longer interact with it.
*
* @param deferTime time to keep this transaction from being returned by
- * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean,
java.util.function.Consumer)}.
- * Must be non-negative.
+ * {@link #runnable(BooleanSupplier, Consumer)}. Must be
non-negative.
*/
void unreserve(Duration deferTime);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index 263a9b090b..451823ac70 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -23,7 +23,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -163,7 +163,7 @@ public interface ReadOnlyFateStore<T> {
* is found or until the keepWaiting parameter is false. It will return once
all runnable ids
* found were passed to the consumer.
*/
- void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus> idConsumer);
+ void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus>
idConsumer);
/**
* Returns true if the deferred map was cleared and if deferred executions
are currently disabled
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 331401fc6b..8c7a956c05 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -140,7 +140,7 @@ public class FateLogger {
}
@Override
- public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus>
idConsumer) {
+ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus>
idConsumer) {
store.runnable(keepWaiting, idConsumer);
}
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index e4d057fc10..86a4106c61 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -30,7 +30,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -273,7 +273,7 @@ public class TestStore implements FateStore<String> {
}
@Override
- public void runnable(AtomicBoolean keepWaiting, Consumer<FateIdStatus>
idConsumer) {
+ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus>
idConsumer) {
throw new UnsupportedOperationException();
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
index 2198b73767..9fd3ba769d 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java
@@ -199,7 +199,7 @@ public abstract class FateStoreITBase extends
SharedMiniClusterBase
try {
// Run and verify all 10 transactions still exist and were not
// run because of the deferral time of all the transactions
- future = executor.submit(() -> store.runnable(keepRunning,
+ future = executor.submit(() -> store.runnable(keepRunning::get,
fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
Thread.sleep(2000);
assertEquals(10, transactions.size());
@@ -225,7 +225,7 @@ public abstract class FateStoreITBase extends
SharedMiniClusterBase
// Run and verify all 11 transactions were processed
// and removed from the store
keepRunning.set(true);
- future = executor.submit(() -> store.runnable(keepRunning,
+ future = executor.submit(() -> store.runnable(keepRunning::get,
fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
Wait.waitFor(transactions::isEmpty);
// Setting this flag to false should terminate the task if sleeping