This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ec3050e813ec428236c58dde8788256ac7f1e2cd Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jan 17 15:57:00 2024 -0500 Exposes status when listing fate operations (#4169) The Accumulo store can very efficiently gather status the status at the same time as the id for a FATE transaction. This is already being gathered internally in implementation of the fate storage layer. This commit exposes this information further as it will be useful in refactoring the ageoff store. --- .../accumulo/core/fate/AbstractFateStore.java | 8 ++++---- .../org/apache/accumulo/core/fate/AdminUtil.java | 3 ++- .../org/apache/accumulo/core/fate/AgeOffStore.java | 4 ++-- .../accumulo/core/fate/ReadOnlyFateStore.java | 8 +++++++- .../org/apache/accumulo/core/fate/ZooStore.java | 2 +- .../accumulo/core/fate/accumulo/AccumuloStore.java | 2 +- .../apache/accumulo/core/logging/FateLogger.java | 2 +- .../apache/accumulo/core/fate/AgeOffStoreTest.java | 21 +++++++++++++-------- .../org/apache/accumulo/core/fate/TestStore.java | 14 ++++++++++++-- 9 files changed, 43 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 7125f692fe..d6cbf2780f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -171,8 +171,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { } @Override - public Stream<Long> list() { - return getTransactions().map(fateIdStatus -> fateIdStatus.txid); + public Stream<FateIdStatus> list() { + return getTransactions(); } @Override @@ -189,10 +189,10 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { return Long.parseLong(txdir.split("_")[1], 16); } - public static abstract class FateIdStatus { + public static abstract class FateIdStatusBase implements FateIdStatus { private final long txid; - public FateIdStatus(long txid) { + public FateIdStatusBase(long txid) { this.txid = txid; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index bbb7f42572..85bc34141c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; @@ -368,7 +369,7 @@ public class AdminUtil<T> { final List<TransactionStatus> statuses = new ArrayList<>(); fateStores.forEach((type, store) -> { - try (Stream<Long> tids = store.list()) { + try (Stream<Long> tids = store.list().map(FateIdStatus::getTxid)) { tids.forEach(tid -> { ReadOnlyFateTxStore<T> txStore = store.read(tid); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index c18b13f583..9470de9e18 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -129,7 +129,7 @@ public class AgeOffStore<T> implements FateStore<T> { // ELASTICITY_TODO need to rework how this class works so that it does not buffer everything in // memory. - List<Long> txids = store.list().collect(Collectors.toList()); + List<Long> txids = store.list().map(FateIdStatus::getTxid).collect(Collectors.toList()); for (Long txid : txids) { FateTxStore<T> txStore = store.reserve(txid); try { @@ -203,7 +203,7 @@ public class AgeOffStore<T> implements FateStore<T> { } @Override - public Stream<Long> list() { + public Stream<FateIdStatus> list() { return store.list(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index e525c89ff9..c5f7a9027c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -118,12 +118,18 @@ public interface ReadOnlyFateStore<T> { long getID(); } + interface FateIdStatus { + long getTxid(); + + TStatus getStatus(); + } + /** * list all transaction ids in store. * * @return all outstanding transactions, including those reserved by others. */ - Stream<Long> list(); + Stream<FateIdStatus> list(); /** * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index b4ad365dfa..31c299cf68 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -308,7 +308,7 @@ public class ZooStore<T> extends AbstractFateStore<T> { // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. Supplier<TStatus> statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid))); - return new FateIdStatus(parseTid(strTxid)) { + return new FateIdStatusBase(parseTid(strTxid)) { @Override public TStatus getStatus() { return statusSupplier.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 9b870537fa..4e81065f0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -82,7 +82,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - return new FateIdStatus(parseTid(e.getKey().getRow().toString())) { + return new FateIdStatusBase(parseTid(e.getKey().getRow().toString())) { @Override public TStatus getStatus() { return TStatus.valueOf(e.getValue().toString()); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index bac25c321b..ffd854bad4 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -115,7 +115,7 @@ public class FateLogger { } @Override - public Stream<Long> list() { + public Stream<FateIdStatus> list() { return store.list(); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index 88447771ec..5cc2671615 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; @@ -75,19 +76,21 @@ public class AgeOffStoreTest { aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 30; aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); } @Test @@ -117,17 +120,19 @@ public class AgeOffStoreTest { AgeOffStore<String> aoStore = new AgeOffStore<>(testStore, 10, tts); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); aoStore.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1, txid2, txid3, txid4), + aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 15; aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); @@ -137,7 +142,7 @@ public class AgeOffStoreTest { aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); txStore1 = aoStore.reserve(txid1); txStore1.setStatus(TStatus.FAILED); @@ -145,7 +150,7 @@ public class AgeOffStoreTest { aoStore.ageOff(); - assertEquals(Set.of(txid1), aoStore.list().collect(toSet())); + assertEquals(Set.of(txid1), aoStore.list().map(FateIdStatus::getTxid).collect(toSet())); tts.time = 42; diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 6dd6368d52..df1d711bae 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -167,8 +167,18 @@ public class TestStore implements FateStore<String> { } @Override - public Stream<Long> list() { - return new ArrayList<>(statuses.keySet()).stream(); + public Stream<FateIdStatus> list() { + return new ArrayList<>(statuses.entrySet()).stream().map(e -> new FateIdStatus() { + @Override + public long getTxid() { + return e.getKey(); + } + + @Override + public TStatus getStatus() { + return e.getValue(); + } + }); } @Override