This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f52c0b4dd4 Use EnumSet for TStatus instead of Set (#4929) f52c0b4dd4 is described below commit f52c0b4dd483fb1a4ff5029ab3efe4b7e8ce0f78 Author: Dom G. <domgargu...@apache.org> AuthorDate: Tue Oct 1 15:48:07 2024 -0400 Use EnumSet for TStatus instead of Set (#4929) * Use EnumSet for TStatus instead of Set * Remove Enum ALL_STATUSES set within the TStatus Enum --- .../org/apache/accumulo/core/fate/AbstractFateStore.java | 13 ++++++------- .../org/apache/accumulo/core/fate/MetaFateStore.java | 15 +++++++-------- .../org/apache/accumulo/core/fate/ReadOnlyFateStore.java | 8 +------- .../accumulo/core/fate/user/RowFateStatusFilter.java | 7 +++---- .../apache/accumulo/core/fate/user/UserFateStore.java | 4 ++-- .../org/apache/accumulo/core/logging/FateLogger.java | 4 ++-- .../java/org/apache/accumulo/core/fate/TestStore.java | 2 +- .../java/org/apache/accumulo/test/fate/FateStoreIT.java | 16 +++++++++++----- .../apache/accumulo/test/fate/user/UserFateStoreIT.java | 10 +++++----- 9 files changed, 38 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 96d5805e6e..a499e07950 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -151,9 +150,9 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { return reserveAttempt.orElseThrow(); } - private static final Set<TStatus> IN_PROGRESS_SET = Set.of(TStatus.IN_PROGRESS); - private static final Set<TStatus> OTHER_RUNNABLE_SET = - Set.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS); + private static final EnumSet<TStatus> IN_PROGRESS_SET = EnumSet.of(TStatus.IN_PROGRESS); + private static final EnumSet<TStatus> OTHER_RUNNABLE_SET = + EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS); @Override public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) { @@ -219,11 +218,11 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { @Override public Stream<FateIdStatus> list() { - return getTransactions(TStatus.ALL_STATUSES); + return getTransactions(EnumSet.allOf(TStatus.class)); } @Override - public Stream<FateIdStatus> list(Set<TStatus> statuses) { + public Stream<FateIdStatus> list(EnumSet<TStatus> statuses) { return getTransactions(statuses); } @@ -276,7 +275,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { "Collision detected for fate id " + fateId); } - protected abstract Stream<FateIdStatus> getTransactions(Set<TStatus> statuses); + protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses); protected abstract TStatus _getStatus(FateId fateId); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java index d801167d07..555498d90d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.ALL_STATUSES; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -29,11 +28,11 @@ import java.io.Serializable; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.function.Predicate; import java.util.function.Supplier; @@ -503,7 +502,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } @Override - protected Stream<FateIdStatus> getTransactions(Set<TStatus> statuses) { + protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) { try { Stream<FateIdStatus> stream = zk.getChildren(path).stream().map(strTxid -> { String txUUIDStr = strTxid.split("_")[1]; @@ -525,11 +524,10 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { }; }); - if (!ALL_STATUSES.equals(statuses)) { - stream = stream.filter(s -> statuses.contains(s.getStatus())); + if (statuses.equals(EnumSet.allOf(TStatus.class))) { + return stream; } - - return stream; + return stream.filter(s -> statuses.contains(s.getStatus())); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -537,7 +535,8 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public Stream<FateKey> list(FateKey.FateKeyType type) { - return getTransactions(ALL_STATUSES).flatMap(fis -> getKey(fis.getFateId()).stream()) + return getTransactions(EnumSet.allOf(TStatus.class)) + .flatMap(fis -> getKey(fis.getFateId()).stream()) .filter(fateKey -> fateKey.getType() == type); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index bbb08f1b4b..8a7efd5f91 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -19,15 +19,12 @@ package org.apache.accumulo.core.fate; import java.io.Serializable; -import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -56,9 +53,6 @@ public interface ReadOnlyFateStore<T> { UNKNOWN, /** Transaction that is eligible to be executed */ SUBMITTED; - - public static final Set<TStatus> ALL_STATUSES = - Arrays.stream(values()).collect(Collectors.toUnmodifiableSet()); } /** @@ -148,7 +142,7 @@ public interface ReadOnlyFateStore<T> { * * @return all outstanding transactions, including those reserved by others. */ - Stream<FateIdStatus> list(Set<TStatus> statuses); + Stream<FateIdStatus> list(EnumSet<TStatus> statuses); /** * list transaction in the store that have a given fate key type. diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java index 61ac19eb4b..f6a6b3bef5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java @@ -18,13 +18,12 @@ */ package org.apache.accumulo.core.fate.user; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.ALL_STATUSES; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import java.io.IOException; import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; @@ -68,9 +67,9 @@ public class RowFateStatusFilter extends WholeRowIterator { } public static void configureScanner(ScannerBase scanner, - Set<ReadOnlyFateStore.TStatus> statuses) { + EnumSet<ReadOnlyFateStore.TStatus> statuses) { // only filter when getting a subset of statuses - if (!statuses.equals(ALL_STATUSES)) { + if (!statuses.equals(EnumSet.allOf(TStatus.class))) { String statusesStr = statuses.stream().map(Enum::name).collect(Collectors.joining(",")); var iterSettings = new IteratorSetting(100, "statuses", RowFateStatusFilter.class); iterSettings.addOption("statuses", statusesStr); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 1d45c170f6..a008e67473 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -21,11 +21,11 @@ package org.apache.accumulo.core.fate.user; import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.EnumSet; import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.SortedMap; import java.util.UUID; import java.util.function.Function; @@ -286,7 +286,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } @Override - protected Stream<FateIdStatus> getTransactions(Set<TStatus> statuses) { + protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) { try { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 5722c61723..17a64541c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -19,9 +19,9 @@ package org.apache.accumulo.core.logging; import java.io.Serializable; +import java.util.EnumSet; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -123,7 +123,7 @@ public class FateLogger { } @Override - public Stream<FateIdStatus> list(Set<TStatus> statuses) { + public Stream<FateIdStatus> list(EnumSet<TStatus> statuses) { return store.list(statuses); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index ccefb8ea66..859fe5040a 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -234,7 +234,7 @@ public class TestStore implements FateStore<String> { } @Override - public Stream<FateIdStatus> list(Set<TStatus> statuses) { + public Stream<FateIdStatus> list(EnumSet<TStatus> statuses) { return list().filter(fis -> statuses.contains(fis.getStatus())); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 938025435f..64607cab7b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -244,12 +245,14 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT executeTest(this::testListStatus); } - protected void testListStatus(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + protected void testListStatus(FateStore<TestEnv> store, ServerContext sctx) { try { Map<FateId,TStatus> expectedStatus = new HashMap<>(); + final EnumSet<TStatus> allStatuses = EnumSet.allOf(TStatus.class); + for (int i = 0; i < 5; i++) { - for (var status : TStatus.values()) { + for (var status : allStatuses) { var fateId = store.create(); var txStore = store.reserve(fateId); txStore.setStatus(status); @@ -257,12 +260,15 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT expectedStatus.put(fateId, status); } } + for (Set<TStatus> statuses : Sets.powerSet(allStatuses)) { + EnumSet<TStatus> enumSet = + statuses.isEmpty() ? EnumSet.noneOf(TStatus.class) : EnumSet.copyOf(statuses); - for (var statuses : Sets.powerSet(Set.of(TStatus.values()))) { var expected = - expectedStatus.entrySet().stream().filter(e -> statuses.contains(e.getValue())) + expectedStatus.entrySet().stream().filter(e -> enumSet.contains(e.getValue())) .map(Map.Entry::getKey).collect(Collectors.toSet()); - var actual = store.list(statuses).map(FateIdStatus::getFateId).collect(Collectors.toSet()); + + var actual = store.list(enumSet).map(FateIdStatus::getFateId).collect(Collectors.toSet()); assertEquals(expected, actual); } } finally { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index be007a1f25..55f89cd605 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -23,9 +23,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.client.Accumulo; @@ -171,7 +171,7 @@ public class UserFateStoreIT extends SharedMiniClusterBase { } private void testOperationWithStatuses(Runnable beforeOperation, Executable operation, - Set<TStatus> acceptableStatuses) throws Exception { + EnumSet<TStatus> acceptableStatuses) throws Exception { for (TStatus status : TStatus.values()) { // Run any needed setup for the operation before each iteration beforeOperation.run(); @@ -192,7 +192,7 @@ public class UserFateStoreIT extends SharedMiniClusterBase { public void push() throws Exception { testOperationWithStatuses(() -> {}, // No special setup needed for push () -> txStore.push(new FateIT.TestRepo("testOp")), - Set.of(TStatus.IN_PROGRESS, TStatus.NEW)); + EnumSet.of(TStatus.IN_PROGRESS, TStatus.NEW)); } @Test @@ -205,14 +205,14 @@ public class UserFateStoreIT extends SharedMiniClusterBase { } catch (Exception e) { throw new RuntimeException("Failed to setup for pop", e); } - }, txStore::pop, Set.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL)); + }, txStore::pop, EnumSet.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL)); } @Test public void delete() throws Exception { testOperationWithStatuses(() -> {}, // No special setup needed for delete txStore::delete, - Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED)); + EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED)); } }